LeAclManagerTest: Use PyHci abstractions
Bug:230123996
Test: gd/cert/run
Tag: #refactor
Change-Id: If869ce69289859f5faa175afc3672d86a2b9b1fc
Merged-In: If869ce69289859f5faa175afc3672d86a2b9b1fc
(cherry picked from commit e0f4f128d290385e93bb71df1133d8fdd0f7f3be)
diff --git a/system/blueberry/tests/gd/cert/py_hci.py b/system/blueberry/tests/gd/cert/py_hci.py
index 682c528..c62b618 100644
--- a/system/blueberry/tests/gd/cert/py_hci.py
+++ b/system/blueberry/tests/gd/cert/py_hci.py
@@ -70,6 +70,42 @@
return self.our_acl_stream.get_event_queue()
+class PyHciLeAclConnection(IEventStream):
+
+ def __init__(self, handle, acl_stream, device, peer, peer_resolvable, local_resolvable):
+ self.handle = int(handle)
+ self.device = device
+ self.peer = peer
+ self.peer_resolvable = peer_resolvable
+ self.local_resolvable = local_resolvable
+ # todo, handle we got is 0, so doesn't match - fix before enabling filtering
+ self.our_acl_stream = FilteringEventStream(acl_stream, None)
+
+ def send(self, pb_flag, b_flag, data):
+ acl = AclBuilder(self.handle, pb_flag, b_flag, RawBuilder(data))
+ self.device.hci.SendAcl(common.Data(payload=bytes(acl.Serialize())))
+
+ def send_first(self, data):
+ self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
+ hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data))
+
+ def send_continuing(self, data):
+ self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT,
+ bytes(data))
+
+ def get_event_queue(self):
+ return self.our_acl_stream.get_event_queue()
+
+ def local_resolvable_address(self):
+ return self.local_resolvable
+
+ def peer_resolvable_address(self):
+ return self.peer_resolvable
+
+ def peer_address(self):
+ return self.peer
+
+
class PyHciAdvertisement(object):
def __init__(self, handle, py_hci):
@@ -79,7 +115,7 @@
def set_data(self, complete_name):
data = GapData()
data.data_type = GapDataType.COMPLETE_LOCAL_NAME
- data.data = list(bytes(complete_name))
+ data.data = list(complete_name)
self.py_hci.send_command(
LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
@@ -87,7 +123,7 @@
def set_scan_response(self, shortened_name):
data = GapData()
data.data_type = GapDataType.SHORTENED_LOCAL_NAME
- data.data = list(bytes(shortened_name))
+ data.data = list(shortened_name)
self.py_hci.send_command(
LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
@@ -121,6 +157,7 @@
self.register_for_events(hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_REQUEST,
hci_packets.EventCode.CONNECTION_COMPLETE,
hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
+ self.register_for_le_events(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
self.acl_stream = EventStream(self.device.hci.StreamAcl(empty_proto.Empty()))
def close(self):
@@ -187,6 +224,38 @@
raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
return PyHciAclConnection(handle, self.acl_stream, self.device)
+ def set_random_le_address(self, addr):
+ self.send_command(hci_packets.LeSetRandomAddressBuilder(addr))
+ assertThat(self.event_stream).emits(HciMatchers.CommandComplete(OpCode.LE_SET_RANDOM_ADDRESS))
+
+ def initiate_le_connection(self, remote_addr):
+ phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
+ phy_scan_params.scan_interval = 0x60
+ phy_scan_params.scan_window = 0x30
+ phy_scan_params.conn_interval_min = 0x18
+ phy_scan_params.conn_interval_max = 0x28
+ phy_scan_params.conn_latency = 0
+ phy_scan_params.supervision_timeout = 0x1f4
+ phy_scan_params.min_ce_length = 0
+ phy_scan_params.max_ce_length = 0
+ self.send_command(
+ hci_packets.LeExtendedCreateConnectionBuilder(
+ hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
+ hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))
+ assertThat(self.event_stream).emits(HciMatchers.CommandStatus(OpCode.LE_EXTENDED_CREATE_CONNECTION))
+
+ def incoming_le_connection(self):
+ connection_complete = HciCaptures.LeConnectionCompleteCapture()
+ assertThat(self.le_event_stream).emits(connection_complete)
+
+ handle = connection_complete.get().GetConnectionHandle()
+ peer = connection_complete.get().GetPeerAddress()
+ local_resolvable = connection_complete.get().GetLocalResolvablePrivateAddress()
+ peer_resolvable = connection_complete.get().GetPeerResolvablePrivateAddress()
+ if self.acl_stream is None:
+ raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
+ return PyHciLeAclConnection(handle, self.acl_stream, self.device, peer, peer_resolvable, local_resolvable)
+
def create_advertisement(self,
handle,
own_address,
diff --git a/system/blueberry/tests/gd/hci/direct_hci_test.py b/system/blueberry/tests/gd/hci/direct_hci_test.py
index 23f1b93..c0dbdc4 100644
--- a/system/blueberry/tests/gd/hci/direct_hci_test.py
+++ b/system/blueberry/tests/gd/hci/direct_hci_test.py
@@ -205,7 +205,6 @@
def test_le_connection_dut_advertises(self):
self.dut_hci.register_for_le_events(SubeventCode.CONNECTION_COMPLETE, SubeventCode.ADVERTISING_SET_TERMINATED,
- SubeventCode.ENHANCED_CONNECTION_COMPLETE,
SubeventCode.READ_REMOTE_FEATURES_COMPLETE)
# Cert Connects
self.cert_hal.unmask_event(EventCode.LE_META_EVENT)
@@ -239,7 +238,6 @@
lambda packet: logging.debug(packet.payload) or b'SomeMoreAclData' in packet.payload)
def test_le_filter_accept_list_connection_cert_advertises(self):
- self.dut_hci.register_for_le_events(SubeventCode.CONNECTION_COMPLETE, SubeventCode.ENHANCED_CONNECTION_COMPLETE)
# DUT Connects
self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
self.dut_hci.send_command(
diff --git a/system/blueberry/tests/gd/hci/le_acl_manager_test.py b/system/blueberry/tests/gd/hci/le_acl_manager_test.py
index 34ef228..52f8534 100644
--- a/system/blueberry/tests/gd/hci/le_acl_manager_test.py
+++ b/system/blueberry/tests/gd/hci/le_acl_manager_test.py
@@ -16,16 +16,13 @@
from blueberry.tests.gd.cert import gd_base_test
from blueberry.tests.gd.cert.closable import safeClose
-from blueberry.tests.gd.cert.event_stream import EventStream
from blueberry.tests.gd.cert.truth import assertThat
+from blueberry.tests.gd.cert.py_hci import PyHci, PyHciAdvertisement
from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager
-from google.protobuf import empty_pb2 as empty_proto
from blueberry.facade import common_pb2 as common
-from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_facade
from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade
from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade
from blueberry.facade.hci import hci_facade_pb2 as hci_facade
-import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import hci_packets
from bluetooth_packets_python3 import RawBuilder
from mobly import test_runner
@@ -38,14 +35,12 @@
def setup_test(self):
gd_base_test.GdBaseTestClass.setup_test(self)
+ self.cert_hci = PyHci(self.cert, acl_streaming=True)
self.dut_le_acl_manager = PyLeAclManager(self.dut)
- self.cert_hci_le_event_stream = EventStream(self.cert.hci.StreamLeSubevents(empty_proto.Empty()))
- self.cert_acl_data_stream = EventStream(self.cert.hci.StreamAcl(empty_proto.Empty()))
def teardown_test(self):
- safeClose(self.cert_hci_le_event_stream)
- safeClose(self.cert_acl_data_stream)
safeClose(self.dut_le_acl_manager)
+ self.cert_hci.close()
gd_base_test.GdBaseTestClass.teardown_test(self)
def set_privacy_policy_static(self):
@@ -74,89 +69,32 @@
self.cert.hci.SendAcl(common.Data(payload=bytes(acl.Serialize())))
def dut_connects(self, check_address):
- self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
- self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
-
# Cert Advertises
advertising_handle = 0
- self.enqueue_hci_command(
- hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
- advertising_handle,
- hci_packets.LegacyAdvertisingProperties.ADV_IND,
- 400,
- 450,
- 7,
- hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
- hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
- '00:00:00:00:00:00',
- hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
- 0xF8,
- 1, #SID
- hci_packets.Enable.DISABLED # Scan request notification
- ))
+ py_hci_adv = PyHciAdvertisement(advertising_handle, self.cert_hci)
- self.enqueue_hci_command(
- hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
+ self.cert_hci.create_advertisement(
+ advertising_handle,
+ '0C:05:04:03:02:01',
+ hci_packets.LegacyAdvertisingProperties.ADV_IND,
+ )
- gap_name = hci_packets.GapData()
- gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
- gap_name.data = list(bytes(b'Im_A_Cert'))
-
- self.enqueue_hci_command(
- hci_packets.LeSetExtendedAdvertisingDataBuilder(
- advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
- hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
-
- gap_short_name = hci_packets.GapData()
- gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
- gap_short_name.data = list(bytes(b'Im_A_C'))
-
- self.enqueue_hci_command(
- hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
- advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
- hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))
-
- enabled_set = hci_packets.EnabledSet()
- enabled_set.advertising_handle = advertising_handle
- enabled_set.duration = 0
- enabled_set.max_extended_advertising_events = 0
- self.enqueue_hci_command(
- hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
+ py_hci_adv.set_data(b'Im_A_Cert')
+ py_hci_adv.set_scan_response(b'Im_A_C')
+ py_hci_adv.start()
self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote(
remote_addr=common.BluetoothAddressWithType(
address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')),
type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)))
- # Cert gets ConnectionComplete with a handle and sends ACL data
- handle = 0xfff
- address = hci_packets.Address()
-
- def get_handle(packet):
- packet_bytes = packet.payload
- nonlocal handle
- nonlocal address
- if b'\x3e\x13\x01\x00' in packet_bytes:
- cc_view = hci_packets.LeConnectionCompleteView(
- hci_packets.LeMetaEventView(
- hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
- handle = cc_view.GetConnectionHandle()
- address = cc_view.GetPeerAddress()
- return True
- if b'\x3e\x1f\x0A\x00' in packet_bytes:
- cc_view = hci_packets.LeEnhancedConnectionCompleteView(
- hci_packets.LeMetaEventView(
- hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
- handle = cc_view.GetConnectionHandle()
- address = cc_view.GetPeerAddress()
- return True
- return False
-
- self.cert_hci_le_event_stream.assert_event_occurs(get_handle)
- self.cert_handle = handle
- dut_address_from_complete = address
+ py_hci_le_acl_connection = self.cert_hci.incoming_le_connection()
+ self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream
+ assertThat(py_hci_le_acl_connection.handle).isNotNone()
if check_address:
- assertThat(dut_address_from_complete).isEqualTo(self.dut_address.decode())
+ assertThat(py_hci_le_acl_connection.peer).isEqualTo(self.dut_address.decode())
+
+ self.cert_handle = py_hci_le_acl_connection.handle
def send_receive_and_check(self):
self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
@@ -164,7 +102,7 @@
bytes(b'\x19\x00\x07\x00SomeAclData from the Cert'))
self.dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT')
- self.cert_acl_data_stream.assert_event_occurs(lambda packet: b'SomeMoreAclData' in packet.payload)
+ assertThat(self.cert_acl_data_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
assertThat(self.dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
def test_dut_connects(self):
@@ -208,9 +146,6 @@
def test_cert_connects(self):
self.set_privacy_policy_static()
- self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
- self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
-
self.dut_le_acl_manager.listen_for_incoming_connections()
# DUT Advertises
@@ -233,53 +168,21 @@
self.dut.hci_le_advertising_manager.CreateAdvertiser(request)
# Cert Connects
- self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'))
- phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
- phy_scan_params.scan_interval = 0x60
- phy_scan_params.scan_window = 0x30
- phy_scan_params.conn_interval_min = 0x18
- phy_scan_params.conn_interval_max = 0x28
- phy_scan_params.conn_latency = 0
- phy_scan_params.supervision_timeout = 0x1f4
- phy_scan_params.min_ce_length = 0
- phy_scan_params.max_ce_length = 0
- self.enqueue_hci_command(
- hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS,
- hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
- hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
- self.dut_address.decode(), 1, [phy_scan_params]))
+ self.cert_hci.set_random_le_address('0C:05:04:03:02:01')
+ self.cert_hci.initiate_le_connection(self.dut_address.decode())
# Cert gets ConnectionComplete with a handle and sends ACL data
- handle = 0xfff
+ py_hci_le_acl_connection = self.cert_hci.incoming_le_connection()
- def get_handle(packet):
- packet_bytes = packet.payload
- nonlocal handle
- if b'\x3e\x13\x01\x00' in packet_bytes:
- cc_view = hci_packets.LeConnectionCompleteView(
- hci_packets.LeMetaEventView(
- hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
- handle = cc_view.GetConnectionHandle()
- return True
- if b'\x3e\x1f\x0A\x00' in packet_bytes:
- cc_view = hci_packets.LeEnhancedConnectionCompleteView(
- hci_packets.LeMetaEventView(
- hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
- handle = cc_view.GetConnectionHandle()
- return True
- return False
-
- self.cert_hci_le_event_stream.assert_event_occurs(get_handle)
- self.cert_handle = handle
-
- self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
- hci_packets.BroadcastFlag.POINT_TO_POINT,
- bytes(b'\x19\x00\x07\x00SomeAclData from the Cert'))
+ py_hci_le_acl_connection.send(hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
+ hci_packets.BroadcastFlag.POINT_TO_POINT,
+ b'\x19\x00\x07\x00SomeAclData from the Cert')
+ self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream
+ assertThat(py_hci_le_acl_connection.handle).isNotNone()
+ self.cert_handle = py_hci_le_acl_connection.handle
# DUT gets a connection complete event and sends and receives
- handle = 0xfff
self.dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection()
-
self.send_receive_and_check()
def test_recombination_l2cap_packet(self):
@@ -294,8 +197,6 @@
assertThat(self.dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload)
def test_background_connection(self):
- self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
- self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
self.set_privacy_policy_static()
# Start background and direct connection
@@ -309,49 +210,13 @@
# Cert Advertises
advertising_handle = 0
- self.enqueue_hci_command(
- hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
- advertising_handle,
- hci_packets.LegacyAdvertisingProperties.ADV_IND,
- 155, # 100ms = 160 * .625ms "LOW_LATENCY"
- 165,
- 7,
- hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
- hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
- '00:00:00:00:00:00',
- hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
- 0xF8,
- 1, #SID
- hci_packets.Enable.DISABLED # Scan request notification
- ))
- self.enqueue_hci_command(
- hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
+ py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01',
+ hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165)
- gap_name = hci_packets.GapData()
- gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
- gap_name.data = list(bytes(b'Im_A_Cert'))
-
- self.enqueue_hci_command(
- hci_packets.LeSetExtendedAdvertisingDataBuilder(
- advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
- hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
-
- gap_short_name = hci_packets.GapData()
- gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
- gap_short_name.data = list(bytes(b'Im_A_C'))
-
- self.enqueue_hci_command(
- hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
- advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
- hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))
-
- enabled_set = hci_packets.EnabledSet()
- enabled_set.advertising_handle = advertising_handle
- enabled_set.duration = 0
- enabled_set.max_extended_advertising_events = 0
- self.enqueue_hci_command(
- hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
+ py_hci_adv.set_data(b'Im_A_Cert')
+ py_hci_adv.set_scan_response(b'Im_A_C')
+ py_hci_adv.start()
# Check background connection complete
self.dut_le_acl_manager.complete_outgoing_connection(token)