blob: fe6f4844ddf6a441afc031f14f12862abec6e909 [file] [log] [blame]
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Map Bluetooth PTS Man Machine Interface to Pandora gRPC calls."""
__version__ = "0.0.1"
from threading import Thread
from typing import List
import time
import sys
import grpc
from mmi2grpc.a2dp import A2DPProxy
from mmi2grpc.avrcp import AVRCPProxy
from mmi2grpc.gatt import GATTProxy
from mmi2grpc.hfp import HFPProxy
from mmi2grpc.hid import HIDProxy
from mmi2grpc.hogp import HOGPProxy
from mmi2grpc.l2cap import L2CAPProxy
from mmi2grpc.sdp import SDPProxy
from mmi2grpc.sm import SMProxy
from mmi2grpc._helpers import format_proxy
from mmi2grpc._rootcanal import RootCanal
from pandora_experimental.host_grpc import Host
PANDORA_SERVER_PORT = 8999
ROOTCANAL_CONTROL_PORT = 6212
MAX_RETRIES = 10
GRPC_SERVER_INIT_TIMEOUT = 10 # seconds
class IUT:
"""IUT class.
Handles MMI calls from the PTS and routes them to corresponding profile
proxy which translates MMI calls to gRPC calls to the IUT.
"""
def __init__(self, test: str, args: List[str], **kwargs):
"""Init IUT class for a given test.
Args:
test: PTS test id.
args: test arguments.
"""
self.pandora_server_port = int(args[0]) if len(args) > 0 else PANDORA_SERVER_PORT
self.rootcanal_control_port = int(args[1]) if len(args) > 1 else ROOTCANAL_CONTROL_PORT
self.test = test
self.rootcanal = None
# Profile proxies.
self._a2dp = None
self._avrcp = None
self._gatt = None
self._hfp = None
self._hid = None
self._hogp = None
self._l2cap = None
self._sdp = None
self._sm = None
def __enter__(self):
"""Resets the IUT when starting a PTS test."""
self.rootcanal = RootCanal(port=self.rootcanal_control_port)
self.rootcanal.reconnect_phone()
# Note: we don't keep a single gRPC channel instance in the IUT class
# because reset is allowed to close the gRPC server.
with grpc.insecure_channel(f'localhost:{self.pandora_server_port}') as channel:
self._retry(Host(channel).HardReset)(wait_for_ready=True)
def __exit__(self, exc_type, exc_value, exc_traceback):
self.rootcanal.close()
self.rootcanal = None
self._a2dp = None
self._avrcp = None
self._gatt = None
self._hfp = None
self._l2cap = None
self._hid = None
self._hogp = None
self._sdp = None
self._sm = None
def _retry(self, func):
def wrapper(*args, **kwargs):
tries = 0
while True:
try:
return func(*args, **kwargs)
except grpc.RpcError or grpc._channel._InactiveRpcError:
tries += 1
if tries >= MAX_RETRIES:
raise
else:
print(f'Retry {func.__name__}: {tries}/{MAX_RETRIES}')
time.sleep(1)
return wrapper
@property
def address(self) -> bytes:
"""Bluetooth MAC address of the IUT.
Raises a timeout exception after GRPC_SERVER_INIT_TIMEOUT seconds.
"""
mut_address = None
def read_local_address():
with grpc.insecure_channel(f'localhost:{self.pandora_server_port}') as channel:
nonlocal mut_address
mut_address = self._retry(Host(channel).ReadLocalAddress)(wait_for_ready=True).address
thread = Thread(target=read_local_address)
thread.start()
thread.join(timeout=GRPC_SERVER_INIT_TIMEOUT)
if not mut_address:
raise Exception("Pandora gRPC server timeout")
else:
return mut_address
def interact(self, pts_address: bytes, profile: str, test: str, interaction: str, description: str, style: str,
**kwargs) -> str:
"""Routes MMI calls to corresponding profile proxy.
Args:
pts_address: Bluetooth MAC address of the PTS in bytes.
profile: Bluetooth profile.
test: PTS test id.
interaction: MMI name.
description: MMI description.
style: MMI popup style, unused for now.
"""
print(f'{profile} mmi: {interaction}', file=sys.stderr)
# Handles A2DP and AVDTP MMIs.
if profile in ('A2DP', 'AVDTP'):
if not self._a2dp:
self._a2dp = A2DPProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'))
return self._a2dp.interact(test, interaction, description, pts_address)
# Handles AVRCP and AVCTP MMIs.
if profile in ('AVRCP', 'AVCTP'):
if not self._avrcp:
self._avrcp = AVRCPProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'))
return self._avrcp.interact(test, interaction, description, pts_address)
# Handles GATT MMIs.
if profile in ('GATT'):
if not self._gatt:
self._gatt = GATTProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'))
return self._gatt.interact(test, interaction, description, pts_address)
# Handles HFP MMIs.
if profile in ('HFP'):
if not self._hfp:
self._hfp = HFPProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'))
return self._hfp.interact(test, interaction, description, pts_address)
# Handles HID MMIs.
if profile in ('HID'):
if not self._hid:
self._hid = HIDProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'), self.rootcanal)
return self._hid.interact(test, interaction, description, pts_address)
# Handles HOGP MMIs.
if profile in ('HOGP'):
if not self._hogp:
self._hogp = HOGPProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'))
return self._hogp.interact(test, interaction, description, pts_address)
# Instantiates L2CAP proxy and reroutes corresponding MMIs to it.
if profile in ('L2CAP'):
if not self._l2cap:
self._l2cap = L2CAPProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'))
return self._l2cap.interact(test, interaction, description, pts_address)
# Handles SDP MMIs.
if profile in ('SDP'):
if not self._sdp:
self._sdp = SDPProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'))
return self._sdp.interact(test, interaction, description, pts_address)
# Handles SM MMIs.
if profile in ('SM'):
if not self._sm:
self._sm = SMProxy(grpc.insecure_channel(f'localhost:{self.pandora_server_port}'))
return self._sm.interact(test, interaction, description, pts_address)
# Handles unsupported profiles.
code = format_proxy(profile, interaction, description)
error_msg = (f'Missing {profile} proxy and mmi: {interaction}\n'
f'Create a {profile.lower()}.py in mmi2grpc/:\n\n{code}\n'
f'Then, instantiate the corresponding proxy in __init__.py\n'
f'Finally, create a {profile.lower()}.proto in proto/pandora/'
f'and generate the corresponding interface.')
assert False, error_msg