blob: 6714e5d6443ce8bfb01c5d8da13b039ec02e81c4 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import grpc
from blueberry.facade.topshim import facade_pb2
from blueberry.facade.topshim import facade_pb2_grpc
from blueberry.tests.topshim.lib.async_closable import AsyncClosable
from google.protobuf import empty_pb2 as empty_proto
class HfpClient(AsyncClosable):
"""
Wrapper gRPC interface to the HFP Service
"""
# Timeout for async wait
DEFAULT_TIMEOUT = 2
__task_list = []
__channel = None
__hfp_stub = None
def __init__(self, port=8999):
self.__channel = grpc.aio.insecure_channel("localhost:%d" % port)
self.__hfp_stub = facade_pb2_grpc.HfpServiceStub(self.__channel)
async def close(self):
"""
Terminate the current tasks.
"""
for task in self.__task_list:
task.cancel()
task = None
self.__task_list.clear()
await self.__channel.close()
async def start_slc(self, address):
"""
"""
await self.__hfp_stub.StartSlc(
facade_pb2.StartSlcRequest(connection=facade_pb2.Connection(cookie=address.encode())))
async def stop_slc(self, address):
"""
"""
await self.__hfp_stub.StopSlc(
facade_pb2.StopSlcRequest(connection=facade_pb2.Connection(cookie=address.encode())))
async def connect_audio(self, address, is_sco_offload_enabled=False, force_cvsd=False):
"""
"""
await self.__hfp_stub.ConnectAudio(
facade_pb2.ConnectAudioRequest(
connection=facade_pb2.Connection(cookie=address.encode()),
is_sco_offload_enabled=is_sco_offload_enabled,
force_cvsd=force_cvsd))
async def disconnect_audio(self, address):
"""
"""
await self.__hfp_stub.DisconnectAudio(
facade_pb2.DisconnectAudioRequest(connection=facade_pb2.Connection(cookie=address.encode())))
async def set_volume(self, address, volume):
"""
"""
await self.__hfp_stub.DisconnectAudio(
facade_pb2.DisconnectAudioRequest(connection=facade_pb2.Connection(cookie=address.encode()), volume=volume))