| #!/usr/bin/env python3 |
| # |
| # Copyright 2020 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from blueberry.tests.gd.cert import gd_base_test |
| from blueberry.tests.gd.cert.closable import safeClose |
| from blueberry.tests.gd.cert.truth import assertThat |
| from blueberry.tests.gd.cert.py_hci import PyHci, PyHciAdvertisement |
| from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager |
| from blueberry.facade import common_pb2 as common |
| from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade |
| from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade |
| from blueberry.facade.hci import hci_facade_pb2 as hci_facade |
| from bluetooth_packets_python3 import hci_packets |
| from bluetooth_packets_python3 import RawBuilder |
| from mobly import test_runner |
| |
| |
| class LeAclManagerTest(gd_base_test.GdBaseTestClass): |
| |
| def setup_class(self): |
| gd_base_test.GdBaseTestClass.setup_class(self, dut_module='HCI_INTERFACES', cert_module='HCI') |
| |
| def setup_test(self): |
| gd_base_test.GdBaseTestClass.setup_test(self) |
| self.cert_hci = PyHci(self.cert, acl_streaming=True) |
| self.dut_le_acl_manager = PyLeAclManager(self.dut) |
| |
| def teardown_test(self): |
| safeClose(self.dut_le_acl_manager) |
| self.cert_hci.close() |
| gd_base_test.GdBaseTestClass.teardown_test(self) |
| |
| def set_privacy_policy_static(self): |
| self.dut_address = b'd0:05:04:03:02:01' |
| private_policy = le_initiator_address_facade.PrivacyPolicy( |
| address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, |
| address_with_type=common.BluetoothAddressWithType( |
| address=common.BluetoothAddress(address=bytes(self.dut_address)), type=common.RANDOM_DEVICE_ADDRESS)) |
| self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) |
| |
| def register_for_event(self, event_code): |
| msg = hci_facade.EventRequest(code=int(event_code)) |
| self.cert.hci.RequestEvent(msg) |
| |
| def register_for_le_event(self, event_code): |
| msg = hci_facade.EventRequest(code=int(event_code)) |
| self.cert.hci.RequestLeSubevent(msg) |
| |
| def enqueue_hci_command(self, command): |
| cmd_bytes = bytes(command.Serialize()) |
| cmd = common.Data(payload=cmd_bytes) |
| self.cert.hci.SendCommand(cmd) |
| |
| def enqueue_acl_data(self, handle, pb_flag, b_flag, data): |
| acl = hci_packets.AclBuilder(handle, pb_flag, b_flag, RawBuilder(data)) |
| self.cert.hci.SendAcl(common.Data(payload=bytes(acl.Serialize()))) |
| |
| def dut_connects(self, check_address): |
| # Cert Advertises |
| advertising_handle = 0 |
| py_hci_adv = PyHciAdvertisement(advertising_handle, self.cert_hci) |
| |
| self.cert_hci.create_advertisement( |
| advertising_handle, |
| '0C:05:04:03:02:01', |
| hci_packets.LegacyAdvertisingProperties.ADV_IND, |
| ) |
| |
| py_hci_adv.set_data(b'Im_A_Cert') |
| py_hci_adv.set_scan_response(b'Im_A_C') |
| py_hci_adv.start() |
| |
| self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote( |
| remote_addr=common.BluetoothAddressWithType( |
| address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), |
| type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS))) |
| |
| py_hci_le_acl_connection = self.cert_hci.incoming_le_connection() |
| self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream |
| assertThat(py_hci_le_acl_connection.handle).isNotNone() |
| if check_address: |
| assertThat(py_hci_le_acl_connection.peer).isEqualTo(self.dut_address.decode()) |
| |
| self.cert_handle = py_hci_le_acl_connection.handle |
| |
| def send_receive_and_check(self): |
| self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, |
| hci_packets.BroadcastFlag.POINT_TO_POINT, |
| bytes(b'\x19\x00\x07\x00SomeAclData from the Cert')) |
| |
| self.dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT') |
| assertThat(self.cert_acl_data_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload) |
| assertThat(self.dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload) |
| |
| def test_dut_connects(self): |
| self.set_privacy_policy_static() |
| self.dut_connects(check_address=True) |
| self.send_receive_and_check() |
| |
| def test_dut_connects_resolvable_address(self): |
| privacy_policy = le_initiator_address_facade.PrivacyPolicy( |
| address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, |
| rotation_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', |
| minimum_rotation_time=7 * 60 * 1000, |
| maximum_rotation_time=15 * 60 * 1000) |
| self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) |
| self.dut_connects(check_address=False) |
| self.send_receive_and_check() |
| |
| def test_dut_connects_non_resolvable_address(self): |
| privacy_policy = le_initiator_address_facade.PrivacyPolicy( |
| address_policy=le_initiator_address_facade.AddressPolicy.USE_NON_RESOLVABLE_ADDRESS, |
| rotation_irk=b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f', |
| minimum_rotation_time=8 * 60 * 1000, |
| maximum_rotation_time=14 * 60 * 1000) |
| self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) |
| self.dut_connects(check_address=False) |
| self.send_receive_and_check() |
| |
| def test_dut_connects_public_address(self): |
| self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( |
| le_initiator_address_facade.PrivacyPolicy( |
| address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) |
| self.dut_connects(check_address=False) |
| self.send_receive_and_check() |
| |
| def test_dut_connects_public_address_cancelled(self): |
| self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( |
| le_initiator_address_facade.PrivacyPolicy( |
| address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) |
| self.dut_connects(check_address=False) |
| self.send_receive_and_check() |
| |
| def test_cert_connects(self): |
| self.set_privacy_policy_static() |
| self.dut_le_acl_manager.listen_for_incoming_connections() |
| |
| # DUT Advertises |
| gap_name = hci_packets.GapData() |
| gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME |
| gap_name.data = list(bytes(b'Im_The_DUT')) |
| gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) |
| config = le_advertising_facade.AdvertisingConfig( |
| advertisement=[gap_data], |
| interval_min=512, |
| interval_max=768, |
| advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, |
| own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, |
| peer_address_type=common.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, |
| peer_address=common.BluetoothAddress(address=bytes(b'A6:A5:A4:A3:A2:A1')), |
| channel_map=7, |
| filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) |
| request = le_advertising_facade.CreateAdvertiserRequest(config=config) |
| |
| self.dut.hci_le_advertising_manager.CreateAdvertiser(request) |
| |
| # Cert Connects |
| self.cert_hci.set_random_le_address('0C:05:04:03:02:01') |
| self.cert_hci.initiate_le_connection(self.dut_address.decode()) |
| |
| # Cert gets ConnectionComplete with a handle and sends ACL data |
| py_hci_le_acl_connection = self.cert_hci.incoming_le_connection() |
| |
| py_hci_le_acl_connection.send(hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, |
| hci_packets.BroadcastFlag.POINT_TO_POINT, |
| b'\x19\x00\x07\x00SomeAclData from the Cert') |
| self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream |
| assertThat(py_hci_le_acl_connection.handle).isNotNone() |
| self.cert_handle = py_hci_le_acl_connection.handle |
| |
| # DUT gets a connection complete event and sends and receives |
| self.dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection() |
| self.send_receive_and_check() |
| |
| def test_recombination_l2cap_packet(self): |
| self.set_privacy_policy_static() |
| self.dut_connects(check_address=True) |
| |
| self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, |
| hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x06\x00\x07\x00Hello')) |
| self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, |
| hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!')) |
| |
| assertThat(self.dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload) |
| |
| def test_background_connection(self): |
| self.set_privacy_policy_static() |
| |
| # Start background and direct connection |
| token = self.dut_le_acl_manager.initiate_background_and_direct_connection( |
| remote_addr=common.BluetoothAddressWithType( |
| address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), |
| type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS))) |
| |
| # Wait for direct connection timeout |
| self.dut_le_acl_manager.wait_for_connection_fail(token) |
| |
| # Cert Advertises |
| advertising_handle = 0 |
| |
| py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01', |
| hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165) |
| |
| py_hci_adv.set_data(b'Im_A_Cert') |
| py_hci_adv.set_scan_response(b'Im_A_C') |
| py_hci_adv.start() |
| |
| # Check background connection complete |
| self.dut_le_acl_manager.complete_outgoing_connection(token) |
| |
| |
| if __name__ == '__main__': |
| test_runner.main() |