blob: 7017ba389e0f33b5f45b0089cead0471f65524ba [file] [log] [blame]
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
from avatar import BumbleDevice, PandoraDevice, PandoraDevices, asynchronous
from bumble.gatt import GATT_ASHA_SERVICE
from mobly import base_test, test_runner
from mobly.asserts import assert_equal # type: ignore
from mobly.asserts import assert_in # type: ignore
from pandora.host_pb2 import DataTypes
class ASHATest(base_test.BaseTestClass): # type: ignore[misc]
ASHA_UUID = GATT_ASHA_SERVICE.to_hex_str()
dut: PandoraDevice
ref: BumbleDevice
def setup_class(self) -> None:
dut, ref = PandoraDevices(self)
assert isinstance(ref, BumbleDevice)
self.dut, self.ref = dut, ref
@asynchronous
async def setup_test(self) -> None:
async def reset(device: PandoraDevice) -> None:
await device.aio.host.FactoryReset()
device.address = (await device.aio.host.ReadLocalAddress(wait_for_ready=True)).address # type: ignore[assignment]
await asyncio.gather(reset(self.dut), reset(self.ref))
def test_ASHA_advertising(self) -> None:
complete_local_name = 'Bumble'
protocol_version = 0x01
capability = 0x00
hisyncid = [0x01, 0x02, 0x03, 0x04, 0x5, 0x6, 0x7, 0x8]
truncated_hisyncid = hisyncid[:4]
self.ref.asha.Register(capability=capability, hisyncid=hisyncid)
advertisement = self.ref.host.Advertise(
legacy=True,
data=DataTypes(
complete_local_name=complete_local_name, incomplete_service_class_uuids16=[ASHATest.ASHA_UUID]
),
)
scan = self.dut.host.Scan()
scan_result = next((x for x in scan if x.data.complete_local_name == complete_local_name))
logging.debug(f"scan_response.data: {scan_result}")
advertisement.cancel()
scan.cancel()
assert_in(ASHATest.ASHA_UUID, scan_result.data.service_data_uuid16)
assert_equal(type(scan_result.data.complete_local_name), str)
expected_advertisement_data = (
"{:02x}".format(protocol_version)
+ "{:02x}".format(capability)
+ "".join([("{:02x}".format(x)) for x in truncated_hisyncid])
)
assert_equal(expected_advertisement_data, (scan_result.data.service_data_uuid16[ASHATest.ASHA_UUID]).hex())
def test_ASHA_scan_response(self) -> None:
complete_local_name = 'Bumble'
protocol_version = 0x01
capability = 0x00
hisyncid = [0x01, 0x02, 0x03, 0x04, 0x5, 0x6, 0x7, 0x8]
truncated_hisyncid = hisyncid[:4]
self.ref.asha.Register(capability=capability, hisyncid=hisyncid)
advertisement = self.ref.host.Advertise(
legacy=True,
scan_response_data=DataTypes(
complete_local_name=complete_local_name, incomplete_service_class_uuids16=[ASHATest.ASHA_UUID]
),
)
scan = self.dut.host.Scan()
scan_response = next((x for x in scan if x.data.complete_local_name == complete_local_name))
logging.debug(f"scan_response.data: {scan_response}")
advertisement.cancel()
scan.cancel()
assert_in(ASHATest.ASHA_UUID, scan_response.data.service_data_uuid16)
expected_advertisement_data = (
"{:02x}".format(protocol_version)
+ "{:02x}".format(capability)
+ "".join([("{:02x}".format(x)) for x in truncated_hisyncid])
)
assert_equal(expected_advertisement_data, (scan_response.data.service_data_uuid16[ASHATest.ASHA_UUID]).hex())
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
test_runner.main() # type: ignore