| # Copyright 2023 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import asyncio |
| import avatar |
| import logging |
| |
| from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices |
| from avatar.pandora_server import AndroidPandoraServer |
| from bumble.core import ( |
| BT_GENERIC_AUDIO_SERVICE, |
| BT_HANDSFREE_AUDIO_GATEWAY_SERVICE, |
| BT_L2CAP_PROTOCOL_ID, |
| BT_RFCOMM_PROTOCOL_ID, |
| ) |
| from bumble.hfp import HfpProtocol |
| from bumble.rfcomm import DLC, Server as RfcommServer |
| from bumble.sdp import ( |
| SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, |
| SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, |
| SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, |
| SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, |
| DataElement, |
| ServiceAttribute, |
| ) |
| from mobly import base_test, test_runner |
| from mobly.asserts import assert_equal # type: ignore |
| from mobly.asserts import assert_in # type: ignore |
| from mobly.asserts import assert_not_equal # type: ignore |
| from mobly.asserts import assert_not_in # type: ignore |
| from pandora.host_pb2 import Connection as PandoraConnection |
| from pandora.security_pb2 import LEVEL2 |
| from typing import Dict, List, Optional, Tuple |
| |
| SDP_PROFILE_SUPPORTED_FEATURES_ID = 0x0311 |
| |
| HFP_AG_FEATURE_HF_INDICATORS = 1 << 10 |
| HFP_AG_FEATURE_DEFAULT = HFP_AG_FEATURE_HF_INDICATORS |
| |
| HFP_HF_FEATURE_HF_INDICATORS = 1 << 8 |
| HFP_HF_FEATURE_DEFAULT = hex(0x01B5) |
| |
| PROPERTY_HF_ENABLED = 'bluetooth.profile.hfp.hf.enabled' |
| PROPERTY_HF_FEATURES = 'bluetooth.hfp.hf_client_features.config' |
| PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY = 'bluetooth.headset_client.indicator.enhanced_driver_safety.enabled' |
| |
| HFP_VERSION_1_7 = 0x0107 |
| |
| |
| class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc] |
| devices: Optional[PandoraDevices] = None |
| |
| # pandora devices. |
| dut: PandoraDevice |
| ref: BumblePandoraDevice |
| |
| def setup_class(self) -> None: |
| self.devices = PandoraDevices(self) |
| self.dut, ref, *_ = self.devices |
| assert isinstance(ref, BumblePandoraDevice) |
| self.ref = ref |
| |
| # Enable BR/EDR mode and SSP for Bumble devices. |
| self.ref.config.setdefault('classic_enabled', True) |
| self.ref.config.setdefault('classic_ssp_enabled', True) |
| self.ref.config.setdefault( |
| 'server', |
| { |
| 'io_capability': 'no_output_no_input', |
| }, |
| ) |
| |
| for server in self.devices._servers: |
| if isinstance(server, AndroidPandoraServer): |
| self.dut_adb = server.device.adb |
| # Enable HFP Client |
| self.dut_adb.shell(['setprop', PROPERTY_HF_ENABLED, 'true']) |
| # Set HF features if not set yet |
| hf_feature_text = self.dut_adb.getprop(PROPERTY_HF_FEATURES) |
| if len(hf_feature_text) == 0: |
| self.dut_adb.shell(['setprop', PROPERTY_HF_FEATURES, HFP_HF_FEATURE_DEFAULT]) |
| break |
| |
| def teardown_class(self) -> None: |
| if self.devices: |
| self.devices.stop_all() |
| |
| @avatar.asynchronous |
| async def setup_test(self) -> None: |
| await asyncio.gather(self.dut.reset(), self.ref.reset()) |
| |
| # TODO(b/286338264): Moving connecting and bonding methods to a shared util scripts |
| async def make_classic_connection(self) -> Tuple[PandoraConnection, PandoraConnection]: |
| dut_ref, ref_dut = await asyncio.gather( |
| self.dut.aio.host.WaitConnection(address=self.ref.address), |
| self.ref.aio.host.Connect(address=self.dut.address), |
| ) |
| |
| assert_equal(dut_ref.result_variant(), 'connection') |
| assert_equal(ref_dut.result_variant(), 'connection') |
| assert dut_ref.connection is not None and ref_dut.connection is not None |
| |
| return dut_ref.connection, ref_dut.connection |
| |
| async def make_classic_bond(self, dut_ref: PandoraConnection, ref_dut: PandoraConnection) -> None: |
| dut_ref_sec, ref_dut_sec = await asyncio.gather( |
| self.dut.aio.security.Secure(connection=dut_ref, classic=LEVEL2), |
| self.ref.aio.security.WaitSecurity(connection=ref_dut, classic=LEVEL2), |
| ) |
| assert_equal(dut_ref_sec.result_variant(), 'success') |
| assert_equal(ref_dut_sec.result_variant(), 'success') |
| |
| async def make_hfp_connection(self) -> HfpProtocol: |
| # Listen RFCOMM |
| dlc_connected = asyncio.get_running_loop().create_future() |
| |
| def on_dlc(dlc: DLC) -> None: |
| dlc_connected.set_result(dlc) |
| |
| rfcomm_server = RfcommServer(self.ref.device) # type: ignore |
| channel_number = rfcomm_server.listen(on_dlc) # type: ignore |
| |
| # Setup SDP records |
| self.ref.device.sdp_service_records = make_bumble_ag_sdp_records(HFP_VERSION_1_7, channel_number, 0) |
| |
| # Connect and pair |
| dut_ref, ref_dut = await self.make_classic_connection() |
| await self.make_classic_bond(dut_ref, ref_dut) |
| |
| # By default, Android HF should auto connect |
| dlc = await dlc_connected |
| assert isinstance(dlc, DLC) |
| |
| return HfpProtocol(dlc) # type: ignore |
| |
| @avatar.parameterized((True,), (False,)) # type: ignore[misc] |
| @avatar.asynchronous |
| async def test_hf_indicator_setup(self, enhanced_driver_safety_enabled: bool) -> None: |
| if enhanced_driver_safety_enabled: |
| self.dut_adb.shell(['setprop', PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY, 'true']) |
| else: |
| self.dut_adb.shell(['setprop', PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY, 'false']) |
| |
| ref_dut_hfp_protocol = await self.make_hfp_connection() |
| |
| class TestAgServer(HfpAgServer): |
| def on_brsf(self, hf_features: int) -> None: |
| # HF indicators should be enabled |
| assert_not_equal(hf_features & HFP_HF_FEATURE_HF_INDICATORS, 0) |
| return super().on_brsf(hf_features) |
| |
| def on_bind_list(self, indicators: list[int]) -> None: |
| if enhanced_driver_safety_enabled: |
| assert_in(1, indicators) |
| else: |
| assert_not_in(1, indicators) |
| self.terminated = True |
| |
| server = TestAgServer(ref_dut_hfp_protocol, ag_features=HFP_AG_FEATURE_HF_INDICATORS) |
| await server.serve() |
| |
| |
| def make_bumble_ag_sdp_records( |
| hfp_version: int, rfcomm_channel: int, ag_sdp_features: int |
| ) -> Dict[int, List[ServiceAttribute]]: |
| return { |
| 0x00010001: [ |
| ServiceAttribute( |
| SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, |
| DataElement.unsigned_integer_32(0x00010001), |
| ), |
| ServiceAttribute( |
| SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, |
| DataElement.sequence( |
| [ |
| DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE), |
| DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), |
| ] |
| ), |
| ), |
| ServiceAttribute( |
| SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, |
| DataElement.sequence( |
| [ |
| DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]), |
| DataElement.sequence( |
| [ |
| DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), |
| DataElement.unsigned_integer_8(rfcomm_channel), |
| ] |
| ), |
| ] |
| ), |
| ), |
| ServiceAttribute( |
| SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, |
| DataElement.sequence( |
| [ |
| DataElement.sequence( |
| [ |
| DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE), |
| DataElement.unsigned_integer_16(hfp_version), |
| ] |
| ) |
| ] |
| ), |
| ), |
| ServiceAttribute( |
| SDP_PROFILE_SUPPORTED_FEATURES_ID, |
| DataElement.unsigned_integer_16(ag_sdp_features), |
| ), |
| ] |
| } |
| |
| |
| class HfpAgServer: |
| enabled_hf_indicators: list[int] |
| hf_features: int |
| |
| def __init__(self, protocol: HfpProtocol, ag_features: int = HFP_AG_FEATURE_DEFAULT) -> None: |
| self.protocol = protocol |
| self.ag_features = ag_features |
| self.terminated = False |
| self.hf_features = 0 # Unknown |
| |
| def send_response_line(self, response: str) -> None: |
| self.protocol.send_response_line(response) # type: ignore |
| |
| async def serve(self) -> None: |
| while not self.terminated: |
| line = await self.protocol.next_line() # type: ignore |
| |
| if line.startswith('AT+BRSF='): |
| hf_features = int(line[len('AT+BRSF=') :]) |
| self.on_brsf(hf_features) |
| elif line.startswith('AT+BIND=?'): |
| self.on_bind_read_capabilities() |
| elif line.startswith('AT+BIND='): |
| indicators = [int(i) for i in line[len('AT+BIND=') :].split(',')] |
| self.on_bind_list(indicators) |
| elif line.startswith('AT+BIND?'): |
| self.on_bind_read_configuration() |
| elif line.startswith('AT+CIND=?'): |
| self.on_cind_read() |
| elif line.startswith('AT+CIND?'): |
| self.on_cind_test() |
| # TODO(b/286226902): Implement handlers for these commands |
| elif line.startswith( |
| ( |
| 'AT+CLIP=', |
| 'AT+VGS=', |
| 'AT+BIA=', |
| 'AT+CMER=', |
| 'AT+XEVENT=', |
| 'AT+XAPL=', |
| ) |
| ): |
| self.send_response_line('OK') |
| else: |
| self.send_response_line('ERROR') |
| |
| def on_brsf(self, hf_features: int) -> None: |
| self.hf_features = hf_features |
| self.send_response_line(f'+BRSF: {self.ag_features}') |
| self.send_response_line('OK') |
| |
| # AT+CIND? |
| def on_cind_read(self) -> None: |
| self.send_response_line('+CIND: 0,0,1,4,1,5,0') |
| self.send_response_line('OK') |
| |
| # AT+CIND=? |
| def on_cind_test(self) -> None: |
| self.send_response_line( |
| '+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' |
| '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' |
| '("callheld",(0-2))' |
| ) |
| self.send_response_line('OK') |
| |
| # AT+BIND= |
| def on_bind_list(self, indicators: list[int]) -> None: |
| self.enabled_hf_indicators = indicators[:] |
| self.send_response_line('OK') |
| |
| # AT+BIND=? |
| def on_bind_read_capabilities(self) -> None: |
| self.send_response_line('+BIND: ' + ','.join(map(str, self.enabled_hf_indicators))) |
| self.send_response_line('OK') |
| |
| # AT+BIND? |
| def on_bind_read_configuration(self) -> None: |
| for i in self.enabled_hf_indicators: |
| self.send_response_line(f'+BIND: {i},1') |
| self.send_response_line('OK') |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.DEBUG) |
| test_runner.main() # type: ignore |