blob: d6555905297612d0b76c542617ffaf03ca4998e5 [file] [log] [blame]
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import avatar
import logging
from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
from avatar.pandora_server import AndroidPandoraServer
from bumble.core import (
BT_GENERIC_AUDIO_SERVICE,
BT_HANDSFREE_AUDIO_GATEWAY_SERVICE,
BT_L2CAP_PROTOCOL_ID,
BT_RFCOMM_PROTOCOL_ID,
)
from bumble.hfp import HfpProtocol
from bumble.rfcomm import DLC, Server as RfcommServer
from bumble.sdp import (
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement,
ServiceAttribute,
)
from mobly import base_test, test_runner
from mobly.asserts import assert_equal # type: ignore
from mobly.asserts import assert_in # type: ignore
from mobly.asserts import assert_not_equal # type: ignore
from mobly.asserts import assert_not_in # type: ignore
from pandora.host_pb2 import Connection as PandoraConnection
from pandora.security_pb2 import LEVEL2
from typing import Dict, List, Optional, Tuple
SDP_PROFILE_SUPPORTED_FEATURES_ID = 0x0311
HFP_AG_FEATURE_HF_INDICATORS = 1 << 10
HFP_AG_FEATURE_DEFAULT = HFP_AG_FEATURE_HF_INDICATORS
HFP_HF_FEATURE_HF_INDICATORS = 1 << 8
HFP_HF_FEATURE_DEFAULT = hex(0x01B5)
PROPERTY_HF_ENABLED = 'bluetooth.profile.hfp.hf.enabled'
PROPERTY_HF_FEATURES = 'bluetooth.hfp.hf_client_features.config'
PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY = 'bluetooth.headset_client.indicator.enhanced_driver_safety.enabled'
HFP_VERSION_1_7 = 0x0107
class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc]
devices: Optional[PandoraDevices] = None
# pandora devices.
dut: PandoraDevice
ref: BumblePandoraDevice
def setup_class(self) -> None:
self.devices = PandoraDevices(self)
self.dut, ref, *_ = self.devices
assert isinstance(ref, BumblePandoraDevice)
self.ref = ref
# Enable BR/EDR mode and SSP for Bumble devices.
self.ref.config.setdefault('classic_enabled', True)
self.ref.config.setdefault('classic_ssp_enabled', True)
self.ref.config.setdefault(
'server',
{
'io_capability': 'no_output_no_input',
},
)
for server in self.devices._servers:
if isinstance(server, AndroidPandoraServer):
self.dut_adb = server.device.adb
# Enable HFP Client
self.dut_adb.shell(['setprop', PROPERTY_HF_ENABLED, 'true'])
# Set HF features if not set yet
hf_feature_text = self.dut_adb.getprop(PROPERTY_HF_FEATURES)
if len(hf_feature_text) == 0:
self.dut_adb.shell(['setprop', PROPERTY_HF_FEATURES, HFP_HF_FEATURE_DEFAULT])
break
def teardown_class(self) -> None:
if self.devices:
self.devices.stop_all()
@avatar.asynchronous
async def setup_test(self) -> None:
await asyncio.gather(self.dut.reset(), self.ref.reset())
# TODO(b/286338264): Moving connecting and bonding methods to a shared util scripts
async def make_classic_connection(self) -> Tuple[PandoraConnection, PandoraConnection]:
dut_ref, ref_dut = await asyncio.gather(
self.dut.aio.host.WaitConnection(address=self.ref.address),
self.ref.aio.host.Connect(address=self.dut.address),
)
assert_equal(dut_ref.result_variant(), 'connection')
assert_equal(ref_dut.result_variant(), 'connection')
assert dut_ref.connection is not None and ref_dut.connection is not None
return dut_ref.connection, ref_dut.connection
async def make_classic_bond(self, dut_ref: PandoraConnection, ref_dut: PandoraConnection) -> None:
dut_ref_sec, ref_dut_sec = await asyncio.gather(
self.dut.aio.security.Secure(connection=dut_ref, classic=LEVEL2),
self.ref.aio.security.WaitSecurity(connection=ref_dut, classic=LEVEL2),
)
assert_equal(dut_ref_sec.result_variant(), 'success')
assert_equal(ref_dut_sec.result_variant(), 'success')
async def make_hfp_connection(self) -> HfpProtocol:
# Listen RFCOMM
dlc_connected = asyncio.get_running_loop().create_future()
def on_dlc(dlc: DLC) -> None:
dlc_connected.set_result(dlc)
rfcomm_server = RfcommServer(self.ref.device) # type: ignore
channel_number = rfcomm_server.listen(on_dlc) # type: ignore
# Setup SDP records
self.ref.device.sdp_service_records = make_bumble_ag_sdp_records(HFP_VERSION_1_7, channel_number, 0)
# Connect and pair
dut_ref, ref_dut = await self.make_classic_connection()
await self.make_classic_bond(dut_ref, ref_dut)
# By default, Android HF should auto connect
dlc = await dlc_connected
assert isinstance(dlc, DLC)
return HfpProtocol(dlc) # type: ignore
@avatar.parameterized((True,), (False,)) # type: ignore[misc]
@avatar.asynchronous
async def test_hf_indicator_setup(self, enhanced_driver_safety_enabled: bool) -> None:
if enhanced_driver_safety_enabled:
self.dut_adb.shell(['setprop', PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY, 'true'])
else:
self.dut_adb.shell(['setprop', PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY, 'false'])
ref_dut_hfp_protocol = await self.make_hfp_connection()
class TestAgServer(HfpAgServer):
def on_brsf(self, hf_features: int) -> None:
# HF indicators should be enabled
assert_not_equal(hf_features & HFP_HF_FEATURE_HF_INDICATORS, 0)
return super().on_brsf(hf_features)
def on_bind_list(self, indicators: list[int]) -> None:
if enhanced_driver_safety_enabled:
assert_in(1, indicators)
else:
assert_not_in(1, indicators)
self.terminated = True
server = TestAgServer(ref_dut_hfp_protocol, ag_features=HFP_AG_FEATURE_HF_INDICATORS)
await server.serve()
def make_bumble_ag_sdp_records(
hfp_version: int, rfcomm_channel: int, ag_sdp_features: int
) -> Dict[int, List[ServiceAttribute]]:
return {
0x00010001: [
ServiceAttribute(
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(0x00010001),
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE),
DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
DataElement.sequence(
[
DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
DataElement.unsigned_integer_8(rfcomm_channel),
]
),
]
),
),
ServiceAttribute(
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE),
DataElement.unsigned_integer_16(hfp_version),
]
)
]
),
),
ServiceAttribute(
SDP_PROFILE_SUPPORTED_FEATURES_ID,
DataElement.unsigned_integer_16(ag_sdp_features),
),
]
}
class HfpAgServer:
enabled_hf_indicators: list[int]
hf_features: int
def __init__(self, protocol: HfpProtocol, ag_features: int = HFP_AG_FEATURE_DEFAULT) -> None:
self.protocol = protocol
self.ag_features = ag_features
self.terminated = False
self.hf_features = 0 # Unknown
def send_response_line(self, response: str) -> None:
self.protocol.send_response_line(response) # type: ignore
async def serve(self) -> None:
while not self.terminated:
line = await self.protocol.next_line() # type: ignore
if line.startswith('AT+BRSF='):
hf_features = int(line[len('AT+BRSF=') :])
self.on_brsf(hf_features)
elif line.startswith('AT+BIND=?'):
self.on_bind_read_capabilities()
elif line.startswith('AT+BIND='):
indicators = [int(i) for i in line[len('AT+BIND=') :].split(',')]
self.on_bind_list(indicators)
elif line.startswith('AT+BIND?'):
self.on_bind_read_configuration()
elif line.startswith('AT+CIND=?'):
self.on_cind_read()
elif line.startswith('AT+CIND?'):
self.on_cind_test()
# TODO(b/286226902): Implement handlers for these commands
elif line.startswith(
(
'AT+CLIP=',
'AT+VGS=',
'AT+BIA=',
'AT+CMER=',
'AT+XEVENT=',
'AT+XAPL=',
)
):
self.send_response_line('OK')
else:
self.send_response_line('ERROR')
def on_brsf(self, hf_features: int) -> None:
self.hf_features = hf_features
self.send_response_line(f'+BRSF: {self.ag_features}')
self.send_response_line('OK')
# AT+CIND?
def on_cind_read(self) -> None:
self.send_response_line('+CIND: 0,0,1,4,1,5,0')
self.send_response_line('OK')
# AT+CIND=?
def on_cind_test(self) -> None:
self.send_response_line(
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
'("callheld",(0-2))'
)
self.send_response_line('OK')
# AT+BIND=
def on_bind_list(self, indicators: list[int]) -> None:
self.enabled_hf_indicators = indicators[:]
self.send_response_line('OK')
# AT+BIND=?
def on_bind_read_capabilities(self) -> None:
self.send_response_line('+BIND: ' + ','.join(map(str, self.enabled_hf_indicators)))
self.send_response_line('OK')
# AT+BIND?
def on_bind_read_configuration(self) -> None:
for i in self.enabled_hf_indicators:
self.send_response_line(f'+BIND: {i},1')
self.send_response_line('OK')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
test_runner.main() # type: ignore