LeAclManagerTest: Use PyHci abstractions

Bug:230123996
Test: gd/cert/run
Tag: #refactor

Change-Id: If869ce69289859f5faa175afc3672d86a2b9b1fc
Merged-In: If869ce69289859f5faa175afc3672d86a2b9b1fc
(cherry picked from commit e0f4f128d290385e93bb71df1133d8fdd0f7f3be)
diff --git a/system/blueberry/tests/gd/cert/py_hci.py b/system/blueberry/tests/gd/cert/py_hci.py
index 682c528..c62b618 100644
--- a/system/blueberry/tests/gd/cert/py_hci.py
+++ b/system/blueberry/tests/gd/cert/py_hci.py
@@ -70,6 +70,42 @@
         return self.our_acl_stream.get_event_queue()
 
 
+class PyHciLeAclConnection(IEventStream):
+
+    def __init__(self, handle, acl_stream, device, peer, peer_resolvable, local_resolvable):
+        self.handle = int(handle)
+        self.device = device
+        self.peer = peer
+        self.peer_resolvable = peer_resolvable
+        self.local_resolvable = local_resolvable
+        # todo, handle we got is 0, so doesn't match - fix before enabling filtering
+        self.our_acl_stream = FilteringEventStream(acl_stream, None)
+
+    def send(self, pb_flag, b_flag, data):
+        acl = AclBuilder(self.handle, pb_flag, b_flag, RawBuilder(data))
+        self.device.hci.SendAcl(common.Data(payload=bytes(acl.Serialize())))
+
+    def send_first(self, data):
+        self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
+                  hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data))
+
+    def send_continuing(self, data):
+        self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT,
+                  bytes(data))
+
+    def get_event_queue(self):
+        return self.our_acl_stream.get_event_queue()
+
+    def local_resolvable_address(self):
+        return self.local_resolvable
+
+    def peer_resolvable_address(self):
+        return self.peer_resolvable
+
+    def peer_address(self):
+        return self.peer
+
+
 class PyHciAdvertisement(object):
 
     def __init__(self, handle, py_hci):
@@ -79,7 +115,7 @@
     def set_data(self, complete_name):
         data = GapData()
         data.data_type = GapDataType.COMPLETE_LOCAL_NAME
-        data.data = list(bytes(complete_name))
+        data.data = list(complete_name)
         self.py_hci.send_command(
             LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                 FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
@@ -87,7 +123,7 @@
     def set_scan_response(self, shortened_name):
         data = GapData()
         data.data_type = GapDataType.SHORTENED_LOCAL_NAME
-        data.data = list(bytes(shortened_name))
+        data.data = list(shortened_name)
         self.py_hci.send_command(
             LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                         FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
@@ -121,6 +157,7 @@
             self.register_for_events(hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_REQUEST,
                                      hci_packets.EventCode.CONNECTION_COMPLETE,
                                      hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
+            self.register_for_le_events(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
             self.acl_stream = EventStream(self.device.hci.StreamAcl(empty_proto.Empty()))
 
     def close(self):
@@ -187,6 +224,38 @@
             raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
         return PyHciAclConnection(handle, self.acl_stream, self.device)
 
+    def set_random_le_address(self, addr):
+        self.send_command(hci_packets.LeSetRandomAddressBuilder(addr))
+        assertThat(self.event_stream).emits(HciMatchers.CommandComplete(OpCode.LE_SET_RANDOM_ADDRESS))
+
+    def initiate_le_connection(self, remote_addr):
+        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
+        phy_scan_params.scan_interval = 0x60
+        phy_scan_params.scan_window = 0x30
+        phy_scan_params.conn_interval_min = 0x18
+        phy_scan_params.conn_interval_max = 0x28
+        phy_scan_params.conn_latency = 0
+        phy_scan_params.supervision_timeout = 0x1f4
+        phy_scan_params.min_ce_length = 0
+        phy_scan_params.max_ce_length = 0
+        self.send_command(
+            hci_packets.LeExtendedCreateConnectionBuilder(
+                hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
+                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))
+        assertThat(self.event_stream).emits(HciMatchers.CommandStatus(OpCode.LE_EXTENDED_CREATE_CONNECTION))
+
+    def incoming_le_connection(self):
+        connection_complete = HciCaptures.LeConnectionCompleteCapture()
+        assertThat(self.le_event_stream).emits(connection_complete)
+
+        handle = connection_complete.get().GetConnectionHandle()
+        peer = connection_complete.get().GetPeerAddress()
+        local_resolvable = connection_complete.get().GetLocalResolvablePrivateAddress()
+        peer_resolvable = connection_complete.get().GetPeerResolvablePrivateAddress()
+        if self.acl_stream is None:
+            raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
+        return PyHciLeAclConnection(handle, self.acl_stream, self.device, peer, peer_resolvable, local_resolvable)
+
     def create_advertisement(self,
                              handle,
                              own_address,
diff --git a/system/blueberry/tests/gd/hci/direct_hci_test.py b/system/blueberry/tests/gd/hci/direct_hci_test.py
index 23f1b93..c0dbdc4 100644
--- a/system/blueberry/tests/gd/hci/direct_hci_test.py
+++ b/system/blueberry/tests/gd/hci/direct_hci_test.py
@@ -205,7 +205,6 @@
 
     def test_le_connection_dut_advertises(self):
         self.dut_hci.register_for_le_events(SubeventCode.CONNECTION_COMPLETE, SubeventCode.ADVERTISING_SET_TERMINATED,
-                                            SubeventCode.ENHANCED_CONNECTION_COMPLETE,
                                             SubeventCode.READ_REMOTE_FEATURES_COMPLETE)
         # Cert Connects
         self.cert_hal.unmask_event(EventCode.LE_META_EVENT)
@@ -239,7 +238,6 @@
             lambda packet: logging.debug(packet.payload) or b'SomeMoreAclData' in packet.payload)
 
     def test_le_filter_accept_list_connection_cert_advertises(self):
-        self.dut_hci.register_for_le_events(SubeventCode.CONNECTION_COMPLETE, SubeventCode.ENHANCED_CONNECTION_COMPLETE)
         # DUT Connects
         self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
         self.dut_hci.send_command(
diff --git a/system/blueberry/tests/gd/hci/le_acl_manager_test.py b/system/blueberry/tests/gd/hci/le_acl_manager_test.py
index 34ef228..52f8534 100644
--- a/system/blueberry/tests/gd/hci/le_acl_manager_test.py
+++ b/system/blueberry/tests/gd/hci/le_acl_manager_test.py
@@ -16,16 +16,13 @@
 
 from blueberry.tests.gd.cert import gd_base_test
 from blueberry.tests.gd.cert.closable import safeClose
-from blueberry.tests.gd.cert.event_stream import EventStream
 from blueberry.tests.gd.cert.truth import assertThat
+from blueberry.tests.gd.cert.py_hci import PyHci, PyHciAdvertisement
 from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager
-from google.protobuf import empty_pb2 as empty_proto
 from blueberry.facade import common_pb2 as common
-from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_facade
 from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade
 from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade
 from blueberry.facade.hci import hci_facade_pb2 as hci_facade
-import bluetooth_packets_python3 as bt_packets
 from bluetooth_packets_python3 import hci_packets
 from bluetooth_packets_python3 import RawBuilder
 from mobly import test_runner
@@ -38,14 +35,12 @@
 
     def setup_test(self):
         gd_base_test.GdBaseTestClass.setup_test(self)
+        self.cert_hci = PyHci(self.cert, acl_streaming=True)
         self.dut_le_acl_manager = PyLeAclManager(self.dut)
-        self.cert_hci_le_event_stream = EventStream(self.cert.hci.StreamLeSubevents(empty_proto.Empty()))
-        self.cert_acl_data_stream = EventStream(self.cert.hci.StreamAcl(empty_proto.Empty()))
 
     def teardown_test(self):
-        safeClose(self.cert_hci_le_event_stream)
-        safeClose(self.cert_acl_data_stream)
         safeClose(self.dut_le_acl_manager)
+        self.cert_hci.close()
         gd_base_test.GdBaseTestClass.teardown_test(self)
 
     def set_privacy_policy_static(self):
@@ -74,89 +69,32 @@
         self.cert.hci.SendAcl(common.Data(payload=bytes(acl.Serialize())))
 
     def dut_connects(self, check_address):
-        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
-        self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
-
         # Cert Advertises
         advertising_handle = 0
-        self.enqueue_hci_command(
-            hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
-                advertising_handle,
-                hci_packets.LegacyAdvertisingProperties.ADV_IND,
-                400,
-                450,
-                7,
-                hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
-                hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
-                '00:00:00:00:00:00',
-                hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
-                0xF8,
-                1,  #SID
-                hci_packets.Enable.DISABLED  # Scan request notification
-            ))
+        py_hci_adv = PyHciAdvertisement(advertising_handle, self.cert_hci)
 
-        self.enqueue_hci_command(
-            hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
+        self.cert_hci.create_advertisement(
+            advertising_handle,
+            '0C:05:04:03:02:01',
+            hci_packets.LegacyAdvertisingProperties.ADV_IND,
+        )
 
-        gap_name = hci_packets.GapData()
-        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
-        gap_name.data = list(bytes(b'Im_A_Cert'))
-
-        self.enqueue_hci_command(
-            hci_packets.LeSetExtendedAdvertisingDataBuilder(
-                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
-                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
-
-        gap_short_name = hci_packets.GapData()
-        gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
-        gap_short_name.data = list(bytes(b'Im_A_C'))
-
-        self.enqueue_hci_command(
-            hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
-                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
-                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))
-
-        enabled_set = hci_packets.EnabledSet()
-        enabled_set.advertising_handle = advertising_handle
-        enabled_set.duration = 0
-        enabled_set.max_extended_advertising_events = 0
-        self.enqueue_hci_command(
-            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
+        py_hci_adv.set_data(b'Im_A_Cert')
+        py_hci_adv.set_scan_response(b'Im_A_C')
+        py_hci_adv.start()
 
         self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote(
             remote_addr=common.BluetoothAddressWithType(
                 address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')),
                 type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)))
 
-        # Cert gets ConnectionComplete with a handle and sends ACL data
-        handle = 0xfff
-        address = hci_packets.Address()
-
-        def get_handle(packet):
-            packet_bytes = packet.payload
-            nonlocal handle
-            nonlocal address
-            if b'\x3e\x13\x01\x00' in packet_bytes:
-                cc_view = hci_packets.LeConnectionCompleteView(
-                    hci_packets.LeMetaEventView(
-                        hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
-                handle = cc_view.GetConnectionHandle()
-                address = cc_view.GetPeerAddress()
-                return True
-            if b'\x3e\x1f\x0A\x00' in packet_bytes:
-                cc_view = hci_packets.LeEnhancedConnectionCompleteView(
-                    hci_packets.LeMetaEventView(
-                        hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
-                handle = cc_view.GetConnectionHandle()
-                address = cc_view.GetPeerAddress()
-                return True
-            return False
-
-        self.cert_hci_le_event_stream.assert_event_occurs(get_handle)
-        self.cert_handle = handle
-        dut_address_from_complete = address
+        py_hci_le_acl_connection = self.cert_hci.incoming_le_connection()
+        self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream
+        assertThat(py_hci_le_acl_connection.handle).isNotNone()
         if check_address:
-            assertThat(dut_address_from_complete).isEqualTo(self.dut_address.decode())
+            assertThat(py_hci_le_acl_connection.peer).isEqualTo(self.dut_address.decode())
+
+        self.cert_handle = py_hci_le_acl_connection.handle
 
     def send_receive_and_check(self):
         self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
@@ -164,7 +102,7 @@
                               bytes(b'\x19\x00\x07\x00SomeAclData from the Cert'))
 
         self.dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT')
-        self.cert_acl_data_stream.assert_event_occurs(lambda packet: b'SomeMoreAclData' in packet.payload)
+        assertThat(self.cert_acl_data_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
         assertThat(self.dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
 
     def test_dut_connects(self):
@@ -208,9 +146,6 @@
 
     def test_cert_connects(self):
         self.set_privacy_policy_static()
-        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
-        self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
-
         self.dut_le_acl_manager.listen_for_incoming_connections()
 
         # DUT Advertises
@@ -233,53 +168,21 @@
         self.dut.hci_le_advertising_manager.CreateAdvertiser(request)
 
         # Cert Connects
-        self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'))
-        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
-        phy_scan_params.scan_interval = 0x60
-        phy_scan_params.scan_window = 0x30
-        phy_scan_params.conn_interval_min = 0x18
-        phy_scan_params.conn_interval_max = 0x28
-        phy_scan_params.conn_latency = 0
-        phy_scan_params.supervision_timeout = 0x1f4
-        phy_scan_params.min_ce_length = 0
-        phy_scan_params.max_ce_length = 0
-        self.enqueue_hci_command(
-            hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS,
-                                                          hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
-                                                          hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
-                                                          self.dut_address.decode(), 1, [phy_scan_params]))
+        self.cert_hci.set_random_le_address('0C:05:04:03:02:01')
+        self.cert_hci.initiate_le_connection(self.dut_address.decode())
 
         # Cert gets ConnectionComplete with a handle and sends ACL data
-        handle = 0xfff
+        py_hci_le_acl_connection = self.cert_hci.incoming_le_connection()
 
-        def get_handle(packet):
-            packet_bytes = packet.payload
-            nonlocal handle
-            if b'\x3e\x13\x01\x00' in packet_bytes:
-                cc_view = hci_packets.LeConnectionCompleteView(
-                    hci_packets.LeMetaEventView(
-                        hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
-                handle = cc_view.GetConnectionHandle()
-                return True
-            if b'\x3e\x1f\x0A\x00' in packet_bytes:
-                cc_view = hci_packets.LeEnhancedConnectionCompleteView(
-                    hci_packets.LeMetaEventView(
-                        hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
-                handle = cc_view.GetConnectionHandle()
-                return True
-            return False
-
-        self.cert_hci_le_event_stream.assert_event_occurs(get_handle)
-        self.cert_handle = handle
-
-        self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
-                              hci_packets.BroadcastFlag.POINT_TO_POINT,
-                              bytes(b'\x19\x00\x07\x00SomeAclData from the Cert'))
+        py_hci_le_acl_connection.send(hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
+                                      hci_packets.BroadcastFlag.POINT_TO_POINT,
+                                      b'\x19\x00\x07\x00SomeAclData from the Cert')
+        self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream
+        assertThat(py_hci_le_acl_connection.handle).isNotNone()
+        self.cert_handle = py_hci_le_acl_connection.handle
 
         # DUT gets a connection complete event and sends and receives
-        handle = 0xfff
         self.dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection()
-
         self.send_receive_and_check()
 
     def test_recombination_l2cap_packet(self):
@@ -294,8 +197,6 @@
         assertThat(self.dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload)
 
     def test_background_connection(self):
-        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
-        self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
         self.set_privacy_policy_static()
 
         # Start background and direct connection
@@ -309,49 +210,13 @@
 
         # Cert Advertises
         advertising_handle = 0
-        self.enqueue_hci_command(
-            hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
-                advertising_handle,
-                hci_packets.LegacyAdvertisingProperties.ADV_IND,
-                155,  # 100ms = 160 * .625ms "LOW_LATENCY"
-                165,
-                7,
-                hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
-                hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
-                '00:00:00:00:00:00',
-                hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
-                0xF8,
-                1,  #SID
-                hci_packets.Enable.DISABLED  # Scan request notification
-            ))
 
-        self.enqueue_hci_command(
-            hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
+        py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01',
+                                                        hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165)
 
-        gap_name = hci_packets.GapData()
-        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
-        gap_name.data = list(bytes(b'Im_A_Cert'))
-
-        self.enqueue_hci_command(
-            hci_packets.LeSetExtendedAdvertisingDataBuilder(
-                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
-                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
-
-        gap_short_name = hci_packets.GapData()
-        gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
-        gap_short_name.data = list(bytes(b'Im_A_C'))
-
-        self.enqueue_hci_command(
-            hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
-                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
-                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))
-
-        enabled_set = hci_packets.EnabledSet()
-        enabled_set.advertising_handle = advertising_handle
-        enabled_set.duration = 0
-        enabled_set.max_extended_advertising_events = 0
-        self.enqueue_hci_command(
-            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
+        py_hci_adv.set_data(b'Im_A_Cert')
+        py_hci_adv.set_scan_response(b'Im_A_C')
+        py_hci_adv.start()
 
         # Check background connection complete
         self.dut_le_acl_manager.complete_outgoing_connection(token)