Begin migrating L2capTest to use PyAclManager

First step towards migrating the cert l2cap module to its own class.

Yes there are hacks - they will go away in a future change.

Test: cert/run --host --test_filter=L2capTest
Change-Id: If30d8ca405927eb43c1e520868b0a2862bf8d75b
diff --git a/system/gd/cert/py_acl_manager.py b/system/gd/cert/py_acl_manager.py
index a8520cd..e3636c1 100644
--- a/system/gd/cert/py_acl_manager.py
+++ b/system/gd/cert/py_acl_manager.py
@@ -38,12 +38,15 @@
         self.our_acl_stream = FilteringEventStream(acl_stream, None)
 
         if remote_addr:
+            remote_addr_bytes = bytes(
+                remote_addr,
+                'utf8') if type(remote_addr) is str else bytes(remote_addr)
             self.connection_event_stream = EventStream(
                 self.device.hci_acl_manager.CreateConnection(
                     acl_manager_facade.ConnectionMsg(
                         address_type=int(
                             hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS),
-                        address=bytes(remote_addr, 'utf8'))))
+                        address=remote_addr_bytes)))
         else:
             self.connection_event_stream = None
 
@@ -76,6 +79,10 @@
         safeClose(self.acl_stream)
         safeClose(self.incoming_connection_stream)
 
+    # temporary, until everyone is migrated
+    def get_acl_stream(self):
+        return self.acl_stream
+
     def listen_for_incoming_connections(self):
         self.incoming_connection_stream = EventStream(
             self.device.hci_acl_manager.FetchIncomingConnection(
diff --git a/system/gd/l2cap/classic/cert/l2cap_test.py b/system/gd/l2cap/classic/cert/l2cap_test.py
index 3361019..54909680 100644
--- a/system/gd/l2cap/classic/cert/l2cap_test.py
+++ b/system/gd/l2cap/classic/cert/l2cap_test.py
@@ -20,6 +20,7 @@
 from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
 from cert.event_stream import EventStream
 from cert.py_l2cap import PyL2cap
+from cert.py_acl_manager import PyAclManager
 from facade import common_pb2
 from facade import rootservice_pb2 as facade_rootservice
 from google.protobuf import empty_pb2 as empty_proto
@@ -69,6 +70,11 @@
         self.ertm_max_transmit = 20
 
         self.dut_l2cap = PyL2cap(self.dut)
+        self.cert_acl_manager = PyAclManager(self.cert)
+
+    def teardown_test(self):
+        self.cert_acl_manager.close()
+        super().teardown_test()
 
     def _on_connection_request_default(self, l2cap_control_view):
         connection_request_view = l2cap_packets.ConnectionRequestView(
@@ -457,33 +463,11 @@
         self.dut.neighbor.EnablePageScan(
             neighbor_facade.EnableMsg(enabled=True))
 
-        with EventStream(
-                self.cert.hci_acl_manager.CreateConnection(
-                    acl_manager_facade.ConnectionMsg(
-                        address_type=int(
-                            hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS),
-                        address=bytes(self.dut_address.address)))
-        ) as connection_event_stream:
-
-            # Cert gets ConnectionComplete with a handle and sends ACL data
-            handle = 0xfff
-
-            def get_handle(packet):
-                packet_bytes = packet.event
-                if b'\x03\x0b\x00' in packet_bytes:
-                    nonlocal handle
-                    cc_view = hci_packets.ConnectionCompleteView(
-                        hci_packets.EventPacketView(
-                            bt_packets.PacketViewLittleEndian(
-                                list(packet_bytes))))
-                    handle = cc_view.GetConnectionHandle()
-                    return True
-                return False
-
-            connection_event_stream.assert_event_occurs(get_handle)
-
-        self.cert_acl_handle = handle
-        return handle
+        with self.cert_acl_manager.initiate_connection(
+                self.dut.address) as cert_acl:
+            cert_acl.wait_for_connection_complete()
+            self.cert_acl_handle = cert_acl.handle  ## HACK HACK HACK
+            return cert_acl.handle
 
     def _open_channel(
             self,
@@ -520,99 +504,82 @@
 
     def test_connect_dynamic_channel_and_send_data(self):
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid,
-                               psm)
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: b'abc' in packet.payload)
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, psm)
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: b'abc' in packet.payload)
 
     def test_fixed_channel(self):
         cert_acl_handle = self._setup_link_from_cert()
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
         self.dut.l2cap.RegisterChannel(
             l2cap_facade_pb2.RegisterChannelRequest(channel=2))
         asserts.skip("FIXME: Not working")
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            self.dut.l2cap.SendL2capPacket(
-                l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123"))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: b'123' in packet.payload)
+        self.dut.l2cap.SendL2capPacket(
+            l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123"))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: b'123' in packet.payload)
 
     def test_receive_packet_from_unknown_channel(self):
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid,
-                               psm)
-            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                0x99, 0, l2cap_packets.Final.NOT_SET, 1,
-                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                SAMPLE_PACKET)
-            self.cert_send_b_frame(i_frame)
-            cert_acl_data_stream.assert_none_matching(
-                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4,
-                timedelta(seconds=1))
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, psm)
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            0x99, 0, l2cap_packets.Final.NOT_SET, 1,
+            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
+        cert_acl_data_stream.assert_none_matching(
+            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4,
+            timedelta(seconds=1))
 
     def test_open_two_channels(self):
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, 0x41,
-                               0x41)
-            self._open_channel(cert_acl_data_stream, 2, cert_acl_handle, 0x43,
-                               0x43)
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, 0x41, 0x41)
+        self._open_channel(cert_acl_data_stream, 2, cert_acl_handle, 0x43, 0x43)
 
     def test_connect_and_send_data_ertm_no_segmentation(self):
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(
-                    psm=psm, payload=b'abc' * 34))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: b'abc' * 34 in packet.payload)
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc' * 34))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: b'abc' * 34 in packet.payload)
 
-            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, 0, l2cap_packets.Final.NOT_SET, 1,
-                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                SAMPLE_PACKET)
-            self.cert_send_b_frame(i_frame)
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, 0, l2cap_packets.Final.NOT_SET, 1,
+            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
 
     def test_basic_operation_request_connection(self):
         """
@@ -621,121 +588,107 @@
         initiate the configuration procedure.
         """
         cert_acl_handle = self._setup_link_from_cert()
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
 
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            psm = 0x33
-            # TODO: Use another test case
-            self.dut.l2cap.OpenChannel(
-                l2cap_facade_pb2.OpenChannelRequest(
-                    remote=self.cert_address, psm=psm))
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_connection_request)
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        psm = 0x33
+        # TODO: Use another test case
+        self.dut.l2cap.OpenChannel(
+            l2cap_facade_pb2.OpenChannelRequest(
+                remote=self.cert_address, psm=psm))
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_connection_request)
 
     def test_accept_disconnect(self):
         """
         L2CAP/COS/CED/BV-07-C
         """
         cert_acl_handle = self._setup_link_from_cert()
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
 
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
 
-            scid = 0x41
-            psm = 0x33
-            self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid,
-                               psm)
+        scid = 0x41
+        psm = 0x33
+        self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, psm)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            close_channel = l2cap_packets.DisconnectionRequestBuilder(
-                1, dcid, scid)
-            close_channel_l2cap = l2cap_packets.BasicFrameBuilder(
-                1, close_channel)
-            self.cert_send_b_frame(close_channel_l2cap)
+        close_channel = l2cap_packets.DisconnectionRequestBuilder(1, dcid, scid)
+        close_channel_l2cap = l2cap_packets.BasicFrameBuilder(1, close_channel)
+        self.cert_send_b_frame(close_channel_l2cap)
 
-            def verify_disconnection_response(packet):
-                packet_bytes = packet.payload
-                l2cap_view = l2cap_packets.BasicFrameView(
-                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
-                l2cap_control_view = l2cap_packets.ControlView(
-                    l2cap_view.GetPayload())
-                if l2cap_control_view.GetCode(
-                ) != l2cap_packets.CommandCode.DISCONNECTION_RESPONSE:
-                    return False
-                disconnection_response_view = l2cap_packets.DisconnectionResponseView(
-                    l2cap_control_view)
-                return disconnection_response_view.GetSourceCid(
-                ) == scid and disconnection_response_view.GetDestinationCid(
-                ) == dcid
+        def verify_disconnection_response(packet):
+            packet_bytes = packet.payload
+            l2cap_view = l2cap_packets.BasicFrameView(
+                bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+            l2cap_control_view = l2cap_packets.ControlView(
+                l2cap_view.GetPayload())
+            if l2cap_control_view.GetCode(
+            ) != l2cap_packets.CommandCode.DISCONNECTION_RESPONSE:
+                return False
+            disconnection_response_view = l2cap_packets.DisconnectionResponseView(
+                l2cap_control_view)
+            return disconnection_response_view.GetSourceCid(
+            ) == scid and disconnection_response_view.GetDestinationCid() == dcid
 
-            cert_acl_data_stream.assert_event_occurs(
-                verify_disconnection_response)
+        cert_acl_data_stream.assert_event_occurs(verify_disconnection_response)
 
     def test_disconnect_on_timeout(self):
         """
         L2CAP/COS/CED/BV-08-C
         """
         cert_acl_handle = self._setup_link_from_cert()
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
 
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            scid = 0x41
-            psm = 0x33
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
+        scid = 0x41
+        psm = 0x33
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
 
-            # Don't send configuration request or response back
-            self.on_configuration_request = lambda _: True
-            self.on_connection_response = lambda _: True
+        # Don't send configuration request or response back
+        self.on_configuration_request = lambda _: True
+        self.on_connection_response = lambda _: True
 
-            self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid,
-                               psm)
+        self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, psm)
 
-            def is_configuration_response(l2cap_packet):
-                packet_bytes = l2cap_packet.payload
-                l2cap_view = l2cap_packets.BasicFrameView(
-                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
-                if l2cap_view.GetChannelId() != 1:
-                    return False
-                l2cap_control_view = l2cap_packets.ControlView(
-                    l2cap_view.GetPayload())
-                return l2cap_control_view.GetCode(
-                ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE
+        def is_configuration_response(l2cap_packet):
+            packet_bytes = l2cap_packet.payload
+            l2cap_view = l2cap_packets.BasicFrameView(
+                bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+            if l2cap_view.GetChannelId() != 1:
+                return False
+            l2cap_control_view = l2cap_packets.ControlView(
+                l2cap_view.GetPayload())
+            return l2cap_control_view.GetCode(
+            ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE
 
-            cert_acl_data_stream.assert_none_matching(is_configuration_response)
+        cert_acl_data_stream.assert_none_matching(is_configuration_response)
 
     def test_retry_config_after_rejection(self):
         """
         L2CAP/COS/CFD/BV-02-C
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
 
-            psm = 0x33
-            scid = 0x41
+        psm = 0x33
+        scid = 0x41
 
-            self.on_configuration_request = self._on_configuration_request_unacceptable_parameters
+        self.on_configuration_request = self._on_configuration_request_unacceptable_parameters
 
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)
 
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request, at_least_times=2)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request, at_least_times=2)
 
     def test_respond_to_echo_request(self):
         """
@@ -743,85 +696,78 @@
         Verify that the IUT responds to an echo request.
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
 
-            echo_request = l2cap_packets.EchoRequestBuilder(
-                100, l2cap_packets.DisconnectionRequestBuilder(1, 2, 3))
-            echo_request_l2cap = l2cap_packets.BasicFrameBuilder(
-                1, echo_request)
-            self.cert_send_b_frame(echo_request_l2cap)
+        echo_request = l2cap_packets.EchoRequestBuilder(
+            100, l2cap_packets.DisconnectionRequestBuilder(1, 2, 3))
+        echo_request_l2cap = l2cap_packets.BasicFrameBuilder(1, echo_request)
+        self.cert_send_b_frame(echo_request_l2cap)
 
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: b"\x06\x01\x04\x00\x02\x00\x03\x00" in packet.payload
-            )
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: b"\x06\x01\x04\x00\x02\x00\x03\x00" in packet.payload
+        )
 
     def test_reject_unknown_command(self):
         """
         L2CAP/COS/CED/BI-01-C
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
 
-            invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00"
-            self.cert.hci_acl_manager.SendAclData(
-                acl_manager_facade.AclData(
-                    handle=cert_acl_handle,
-                    payload=bytes(invalid_command_packet)))
+        invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00"
+        self.cert.hci_acl_manager.SendAclData(
+            acl_manager_facade.AclData(
+                handle=cert_acl_handle, payload=bytes(invalid_command_packet)))
 
-            def is_command_reject(l2cap_packet):
-                packet_bytes = l2cap_packet.payload
-                l2cap_view = l2cap_packets.BasicFrameView(
-                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
-                if l2cap_view.GetChannelId() != 1:
-                    return False
-                l2cap_control_view = l2cap_packets.ControlView(
-                    l2cap_view.GetPayload())
-                return l2cap_control_view.GetCode(
-                ) == l2cap_packets.CommandCode.COMMAND_REJECT
+        def is_command_reject(l2cap_packet):
+            packet_bytes = l2cap_packet.payload
+            l2cap_view = l2cap_packets.BasicFrameView(
+                bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+            if l2cap_view.GetChannelId() != 1:
+                return False
+            l2cap_control_view = l2cap_packets.ControlView(
+                l2cap_view.GetPayload())
+            return l2cap_control_view.GetCode(
+            ) == l2cap_packets.CommandCode.COMMAND_REJECT
 
-            cert_acl_data_stream.assert_event_occurs(is_command_reject)
+        cert_acl_data_stream.assert_event_occurs(is_command_reject)
 
     def test_query_for_1_2_features(self):
         """
         L2CAP/COS/IEX/BV-01-C [Query for 1.2 Features]
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            signal_id = 3
-            information_request = l2cap_packets.InformationRequestBuilder(
-                signal_id, l2cap_packets.InformationRequestInfoType.
-                EXTENDED_FEATURES_SUPPORTED)
-            information_request_l2cap = l2cap_packets.BasicFrameBuilder(
-                1, information_request)
-            self.cert_send_b_frame(information_request_l2cap)
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        signal_id = 3
+        information_request = l2cap_packets.InformationRequestBuilder(
+            signal_id,
+            l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
+        )
+        information_request_l2cap = l2cap_packets.BasicFrameBuilder(
+            1, information_request)
+        self.cert_send_b_frame(information_request_l2cap)
 
-            def is_correct_information_response(l2cap_packet):
-                packet_bytes = l2cap_packet.payload
-                l2cap_view = l2cap_packets.BasicFrameView(
-                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
-                if l2cap_view.GetChannelId() != 1:
-                    return False
-                l2cap_control_view = l2cap_packets.ControlView(
-                    l2cap_view.GetPayload())
-                if l2cap_control_view.GetCode(
-                ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
-                    return False
-                information_response_view = l2cap_packets.InformationResponseView(
-                    l2cap_control_view)
-                return information_response_view.GetInfoType(
-                ) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
+        def is_correct_information_response(l2cap_packet):
+            packet_bytes = l2cap_packet.payload
+            l2cap_view = l2cap_packets.BasicFrameView(
+                bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+            if l2cap_view.GetChannelId() != 1:
+                return False
+            l2cap_control_view = l2cap_packets.ControlView(
+                l2cap_view.GetPayload())
+            if l2cap_control_view.GetCode(
+            ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
+                return False
+            information_response_view = l2cap_packets.InformationResponseView(
+                l2cap_control_view)
+            return information_response_view.GetInfoType(
+            ) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
 
-            cert_acl_data_stream.assert_event_occurs(
-                is_correct_information_response)
+        cert_acl_data_stream.assert_event_occurs(
+            is_correct_information_response)
 
     def test_extended_feature_info_response_ertm(self):
         """
@@ -829,41 +775,40 @@
         Retransmission Mode]
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
 
-            signal_id = 3
-            information_request = l2cap_packets.InformationRequestBuilder(
-                signal_id, l2cap_packets.InformationRequestInfoType.
-                EXTENDED_FEATURES_SUPPORTED)
-            information_request_l2cap = l2cap_packets.BasicFrameBuilder(
-                1, information_request)
-            self.cert_send_b_frame(information_request_l2cap)
+        signal_id = 3
+        information_request = l2cap_packets.InformationRequestBuilder(
+            signal_id,
+            l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
+        )
+        information_request_l2cap = l2cap_packets.BasicFrameBuilder(
+            1, information_request)
+        self.cert_send_b_frame(information_request_l2cap)
 
-            def is_correct_information_response(l2cap_packet):
-                packet_bytes = l2cap_packet.payload
-                l2cap_view = l2cap_packets.BasicFrameView(
-                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
-                if l2cap_view.GetChannelId() != 1:
-                    return False
-                l2cap_control_view = l2cap_packets.ControlView(
-                    l2cap_view.GetPayload())
-                if l2cap_control_view.GetCode(
-                ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
-                    return False
-                information_response_view = l2cap_packets.InformationResponseView(
-                    l2cap_control_view)
-                if information_response_view.GetInfoType(
-                ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
-                    return False
-                extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
-                    information_response_view)
-                return extended_features_view.GetEnhancedRetransmissionMode()
+        def is_correct_information_response(l2cap_packet):
+            packet_bytes = l2cap_packet.payload
+            l2cap_view = l2cap_packets.BasicFrameView(
+                bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+            if l2cap_view.GetChannelId() != 1:
+                return False
+            l2cap_control_view = l2cap_packets.ControlView(
+                l2cap_view.GetPayload())
+            if l2cap_control_view.GetCode(
+            ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
+                return False
+            information_response_view = l2cap_packets.InformationResponseView(
+                l2cap_control_view)
+            if information_response_view.GetInfoType(
+            ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
+                return False
+            extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
+                information_response_view)
+            return extended_features_view.GetEnhancedRetransmissionMode()
 
-            cert_acl_data_stream.assert_event_occurs(
-                is_correct_information_response)
+        cert_acl_data_stream.assert_event_occurs(
+            is_correct_information_response)
 
     def test_extended_feature_info_response_fcs(self):
         """
@@ -871,41 +816,40 @@
         Note: This is not mandated by L2CAP Spec
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
 
-            signal_id = 3
-            information_request = l2cap_packets.InformationRequestBuilder(
-                signal_id, l2cap_packets.InformationRequestInfoType.
-                EXTENDED_FEATURES_SUPPORTED)
-            information_request_l2cap = l2cap_packets.BasicFrameBuilder(
-                1, information_request)
-            self.cert_send_b_frame(information_request_l2cap)
+        signal_id = 3
+        information_request = l2cap_packets.InformationRequestBuilder(
+            signal_id,
+            l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
+        )
+        information_request_l2cap = l2cap_packets.BasicFrameBuilder(
+            1, information_request)
+        self.cert_send_b_frame(information_request_l2cap)
 
-            def is_correct_information_response(l2cap_packet):
-                packet_bytes = l2cap_packet.payload
-                l2cap_view = l2cap_packets.BasicFrameView(
-                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
-                if l2cap_view.GetChannelId() != 1:
-                    return False
-                l2cap_control_view = l2cap_packets.ControlView(
-                    l2cap_view.GetPayload())
-                if l2cap_control_view.GetCode(
-                ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
-                    return False
-                information_response_view = l2cap_packets.InformationResponseView(
-                    l2cap_control_view)
-                if information_response_view.GetInfoType(
-                ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
-                    return False
-                extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
-                    information_response_view)
-                return extended_features_view.GetFcsOption()
+        def is_correct_information_response(l2cap_packet):
+            packet_bytes = l2cap_packet.payload
+            l2cap_view = l2cap_packets.BasicFrameView(
+                bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+            if l2cap_view.GetChannelId() != 1:
+                return False
+            l2cap_control_view = l2cap_packets.ControlView(
+                l2cap_view.GetPayload())
+            if l2cap_control_view.GetCode(
+            ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
+                return False
+            information_response_view = l2cap_packets.InformationResponseView(
+                l2cap_control_view)
+            if information_response_view.GetInfoType(
+            ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
+                return False
+            extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
+                information_response_view)
+            return extended_features_view.GetFcsOption()
 
-            cert_acl_data_stream.assert_event_occurs(
-                is_correct_information_response)
+        cert_acl_data_stream.assert_event_occurs(
+            is_correct_information_response)
 
     def test_config_channel_not_use_FCS(self):
         """
@@ -913,32 +857,30 @@
         Verify the IUT can configure a channel to not use FCS in I/S-frames.
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
 
-            self.on_connection_response = self._on_connection_response_use_ertm
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: b"abc" in packet.payload)
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: b"abc" in packet.payload)
 
     def test_explicitly_request_use_FCS(self):
         """
@@ -948,32 +890,30 @@
         """
 
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
 
-            self.on_connection_response = self._on_connection_response_use_ertm_and_fcs
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        self.on_connection_response = self._on_connection_response_use_ertm_and_fcs
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: b"abc\x4f\xa3" in packet.payload
-            )  # TODO: Use packet parser
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: b"abc\x4f\xa3" in packet.payload
+        )  # TODO: Use packet parser
 
     def test_implicitly_request_use_FCS(self):
         """
@@ -981,97 +921,90 @@
         TODO: Update this test case. What's the difference between this one and test_explicitly_request_use_FCS?
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
 
-            self.on_connection_response = self._on_connection_response_use_ertm_and_fcs
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        self.on_connection_response = self._on_connection_response_use_ertm_and_fcs
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: b"abc\x4f\xa3" in packet.payload
-            )  # TODO: Use packet parser
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: b"abc\x4f\xa3" in packet.payload
+        )  # TODO: Use packet parser
 
     def test_transmit_i_frames(self):
         """
         L2CAP/ERM/BV-01-C [Transmit I-frames]
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
 
-            self.on_connection_response = self._on_connection_response_use_ertm
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        self.on_connection_response = self._on_connection_response_use_ertm
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: b"abc" in packet.payload)
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: b"abc" in packet.payload)
 
-            # Assemble a sample packet. TODO: Use RawBuilder
-            SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1)
+        # Assemble a sample packet. TODO: Use RawBuilder
+        SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1)
 
-            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, 0, l2cap_packets.Final.NOT_SET, 1,
-                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                SAMPLE_PACKET)
-            self.cert_send_b_frame(i_frame)
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, 0, l2cap_packets.Final.NOT_SET, 1,
+            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: b"abc" in packet.payload)
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: b"abc" in packet.payload)
 
-            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, 1, l2cap_packets.Final.NOT_SET, 2,
-                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                SAMPLE_PACKET)
-            self.cert_send_b_frame(i_frame)
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, 1, l2cap_packets.Final.NOT_SET, 2,
+            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: b"abc" in packet.payload)
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: b"abc" in packet.payload)
 
-            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, 2, l2cap_packets.Final.NOT_SET, 3,
-                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                SAMPLE_PACKET)
-            self.cert_send_b_frame(i_frame)
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, 2, l2cap_packets.Final.NOT_SET, 3,
+            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
 
     def test_receive_i_frames(self):
         """
@@ -1079,64 +1012,61 @@
         Verify the IUT can receive in-sequence valid I-frames and deliver L2CAP SDUs to the Upper Tester
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            for i in range(3):
-                i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                    dcid, i, l2cap_packets.Final.NOT_SET, 0,
-                    l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                    SAMPLE_PACKET)
-                self.cert_send_b_frame(i_frame)
-                cert_acl_data_stream.assert_event_occurs(
-                    lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
-                )
-
+        for i in range(3):
             i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, 3, l2cap_packets.Final.NOT_SET, 0,
-                l2cap_packets.SegmentationAndReassembly.START, SAMPLE_PACKET)
-            self.cert_send_b_frame(i_frame)
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4
-            )
-
-            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, 4, l2cap_packets.Final.NOT_SET, 0,
-                l2cap_packets.SegmentationAndReassembly.CONTINUATION,
+                dcid, i, l2cap_packets.Final.NOT_SET, 0,
+                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
                 SAMPLE_PACKET)
             self.cert_send_b_frame(i_frame)
             cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 5
+                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
             )
 
-            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, 5, l2cap_packets.Final.NOT_SET, 0,
-                l2cap_packets.SegmentationAndReassembly.END, SAMPLE_PACKET)
-            self.cert_send_b_frame(i_frame)
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 6
-            )
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, 3, l2cap_packets.Final.NOT_SET, 0,
+            l2cap_packets.SegmentationAndReassembly.START, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4
+        )
+
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, 4, l2cap_packets.Final.NOT_SET, 0,
+            l2cap_packets.SegmentationAndReassembly.CONTINUATION, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 5
+        )
+
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, 5, l2cap_packets.Final.NOT_SET, 0,
+            l2cap_packets.SegmentationAndReassembly.END, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 6
+        )
 
     def test_acknowledging_received_i_frames(self):
         """
@@ -1145,43 +1075,41 @@
         Lower Tester
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            # FIXME: Order shouldn't matter here
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
+
+        for i in range(3):
+            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+                dcid, i, l2cap_packets.Final.NOT_SET, 0,
+                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
+                SAMPLE_PACKET)
+            self.cert_send_b_frame(i_frame)
             cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
+            )
 
-            for i in range(3):
-                i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                    dcid, i, l2cap_packets.Final.NOT_SET, 0,
-                    l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                    SAMPLE_PACKET)
-                self.cert_send_b_frame(i_frame)
-                cert_acl_data_stream.assert_event_occurs(
-                    lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
-                )
-
-            cert_acl_data_stream.assert_none_matching(
-                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4,
-                timedelta(seconds=1))
+        cert_acl_data_stream.assert_none_matching(
+            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4,
+            timedelta(seconds=1))
 
     def test_resume_transmitting_when_received_rr(self):
         """
@@ -1192,50 +1120,45 @@
         """
         self.ertm_tx_window_size = 1
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))
 
-            # TODO: Besides checking TxSeq, we also want to check payload, once we can get it from packet view
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
-            )
-            cert_acl_data_stream.assert_none_matching(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
-            )
-            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-                dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
-                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
-                1)
-            self.cert_send_b_frame(s_frame)
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1
-            )
+        # TODO: Besides checking TxSeq, we also want to check payload, once we can get it from packet view
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
+        cert_acl_data_stream.assert_none_matching(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
+        )
+        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
+            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1)
+        self.cert_send_b_frame(s_frame)
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1)
 
     def test_resume_transmitting_when_acknowledge_previously_sent(self):
         """
@@ -1246,58 +1169,52 @@
         """
         self.ertm_tx_window_size = 1
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))
 
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
-            )
-            # TODO: If 1 second is greater than their retransmit timeout, use a smaller timeout
-            cert_acl_data_stream.assert_none_matching(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
-                timedelta(seconds=1))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
+        # TODO: If 1 second is greater than their retransmit timeout, use a smaller timeout
+        cert_acl_data_stream.assert_none_matching(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
+            timedelta(seconds=1))
 
-            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, 0, l2cap_packets.Final.NOT_SET, 1,
-                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                SAMPLE_PACKET)
-            self.cert_send_b_frame(i_frame)
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, 0, l2cap_packets.Final.NOT_SET, 1,
+            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
 
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1
-            )
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1)
 
-            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, 1, l2cap_packets.Final.NOT_SET, 2,
-                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                SAMPLE_PACKET)
-            self.cert_send_b_frame(i_frame)
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, 1, l2cap_packets.Final.NOT_SET, 2,
+            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
 
     def test_transmit_s_frame_rr_with_poll_bit_set(self):
         """
@@ -1305,35 +1222,33 @@
         Verify the IUT sends an S-frame [RR] with the Poll bit set when its retransmission timer expires.
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            # TODO: Always use their retransmission timeout value
-            time.sleep(2)
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
-            )
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        # TODO: Always use their retransmission timeout value
+        time.sleep(2)
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
+        )
 
     def test_transmit_s_frame_rr_with_final_bit_set(self):
         """
@@ -1342,38 +1257,36 @@
         with the Poll bit set.
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-                dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
-                l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
-            self.cert_send_b_frame(s_frame)
+        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
+            l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
+        self.cert_send_b_frame(s_frame)
 
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE
-            )
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE
+        )
 
     def test_s_frame_transmissions_exceed_max_transmit(self):
         """
@@ -1382,37 +1295,35 @@
         """
         asserts.skip("Need to configure DUT to have a shorter timer")
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
 
-            # Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362
-            time.sleep(362)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_disconnection_request)
+        # Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362
+        time.sleep(362)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_disconnection_request)
 
     def test_i_frame_transmissions_exceed_max_transmit(self):
         """
@@ -1421,45 +1332,41 @@
         not acknowledge the previous I-frame sent by the IUT.
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
 
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
-            )
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
 
-            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-                dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
-                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
-                0)
-            self.cert_send_b_frame(s_frame)
+        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
+            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
+        self.cert_send_b_frame(s_frame)
 
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_disconnection_request)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_disconnection_request)
 
     def test_respond_to_rej(self):
         """
@@ -1469,48 +1376,46 @@
         self.ertm_tx_window_size = 2
         self.ertm_max_transmit = 2
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
+
+        dcid = self.scid_to_dcid[scid]
+
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        for i in range(2):
             cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
+                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i,
+                timeout=timedelta(seconds=0.5))
+
+        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            dcid, l2cap_packets.SupervisoryFunction.REJECT,
+            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
+        self.cert_send_b_frame(s_frame)
+
+        for i in range(2):
             cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
-
-            dcid = self.scid_to_dcid[scid]
-
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            for i in range(2):
-                cert_acl_data_stream.assert_event_occurs(
-                    lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i,
-                    timeout=timedelta(seconds=0.5))
-
-            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-                dcid, l2cap_packets.SupervisoryFunction.REJECT,
-                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
-            self.cert_send_b_frame(s_frame)
-
-            for i in range(2):
-                cert_acl_data_stream.assert_event_occurs(
-                    lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i,
-                    timeout=timedelta(seconds=0.5))
+                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i,
+                timeout=timedelta(seconds=0.5))
 
     def test_receive_s_frame_rr_final_bit_set(self):
         """
@@ -1519,48 +1424,44 @@
         [RR] with the Final Bit set.
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
 
-            # TODO: Always use their retransmission timeout value
-            time.sleep(2)
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
-            )
+        # TODO: Always use their retransmission timeout value
+        time.sleep(2)
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
+        )
 
-            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-                dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
-                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
-                0)
-            self.cert_send_b_frame(s_frame)
+        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
+            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
+        self.cert_send_b_frame(s_frame)
 
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
-            )
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
 
     def test_receive_i_frame_final_bit_set(self):
         """
@@ -1569,48 +1470,44 @@
         with the final bit set.
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
 
-            # TODO: Always use their retransmission timeout value
-            time.sleep(2)
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
-            )
+        # TODO: Always use their retransmission timeout value
+        time.sleep(2)
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
+        )
 
-            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
-                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                SAMPLE_PACKET)
-            self.cert_send_b_frame(i_frame)
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
+            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
 
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
-            )
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
 
     def test_recieve_rnr(self):
         """
@@ -1619,48 +1516,44 @@
         Lower Tester (S-frame [RNR]).
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
 
-            # TODO: Always use their retransmission timeout value
-            time.sleep(2)
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
-            )
+        # TODO: Always use their retransmission timeout value
+        time.sleep(2)
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
+        )
 
-            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-                dcid, l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY,
-                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
-                0)
-            self.cert_send_b_frame(s_frame)
+        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY,
+            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
+        self.cert_send_b_frame(s_frame)
 
-            cert_acl_data_stream.assert_none_matching(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
-            )
+        cert_acl_data_stream.assert_none_matching(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
 
     def test_sent_rej_lost(self):
         """
@@ -1670,126 +1563,118 @@
         """
         self.ertm_tx_window_size = 5
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, 0, l2cap_packets.Final.NOT_SET, 0,
+            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1
+        )
+
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, self.ertm_tx_window_size - 1, l2cap_packets.Final.NOT_SET, 0,
+            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_s_from_ertm_s_frame(scid, packet) == l2cap_packets.SupervisoryFunction.REJECT
+        )
+
+        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
+            l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
+        self.cert_send_b_frame(s_frame)
+
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1 and self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE)
+        for i in range(1, self.ertm_tx_window_size):
             i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, 0, l2cap_packets.Final.NOT_SET, 0,
+                dcid, i, l2cap_packets.Final.NOT_SET, 0,
                 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
                 SAMPLE_PACKET)
             self.cert_send_b_frame(i_frame)
             cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1
+                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
             )
 
-            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, self.ertm_tx_window_size - 1, l2cap_packets.Final.NOT_SET,
-                0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                SAMPLE_PACKET)
-            self.cert_send_b_frame(i_frame)
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_s_from_ertm_s_frame(scid, packet) == l2cap_packets.SupervisoryFunction.REJECT
-            )
-
-            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-                dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
-                l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
-            self.cert_send_b_frame(s_frame)
-
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1 and self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE)
-            for i in range(1, self.ertm_tx_window_size):
-                i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                    dcid, i, l2cap_packets.Final.NOT_SET, 0,
-                    l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                    SAMPLE_PACKET)
-                self.cert_send_b_frame(i_frame)
-                cert_acl_data_stream.assert_event_occurs(
-                    lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
-                )
-
     def test_handle_duplicate_srej(self):
         """
         L2CAP/ERM/BI-03-C [Handle Duplicate S-Frame [SREJ]]
         Verify the IUT will only retransmit the requested I-frame once after receiving a duplicate SREJ.
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
-                timeout=timedelta(0.5))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
-                timeout=timedelta(0.5))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
-            )
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
+            timeout=timedelta(0.5))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
+            timeout=timedelta(0.5))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
+        )
 
-            # Send SREJ with F not set
-            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-                dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
-                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
-            self.cert_send_b_frame(s_frame)
+        # Send SREJ with F not set
+        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
+            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
+        self.cert_send_b_frame(s_frame)
 
-            cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
-            # Send SREJ with F set
-            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-                dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
-                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
-                0)
-            self.cert_send_b_frame(s_frame)
+        cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
+        # Send SREJ with F set
+        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
+            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
+        self.cert_send_b_frame(s_frame)
 
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
-            )
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
 
     def test_handle_receipt_rej_and_rr_with_f_set(self):
         """
@@ -1799,65 +1684,60 @@
         retransmitted.
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
-                timeout=timedelta(0.5))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
-                timeout=timedelta(0.5))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL,
-                timeout=timedelta(2))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
+            timeout=timedelta(0.5))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
+            timeout=timedelta(0.5))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL,
+            timeout=timedelta(2))
 
-            # Send REJ with F not set
-            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-                dcid, l2cap_packets.SupervisoryFunction.REJECT,
-                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
-            self.cert_send_b_frame(s_frame)
+        # Send REJ with F not set
+        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            dcid, l2cap_packets.SupervisoryFunction.REJECT,
+            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
+        self.cert_send_b_frame(s_frame)
 
-            cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
+        cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
 
-            # Send RR with F set
-            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-                dcid, l2cap_packets.SupervisoryFunction.REJECT,
-                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
-                0)
-            self.cert_send_b_frame(s_frame)
+        # Send RR with F set
+        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            dcid, l2cap_packets.SupervisoryFunction.REJECT,
+            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
+        self.cert_send_b_frame(s_frame)
 
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
-            )
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1
-            )
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1)
 
     def test_handle_rej_and_i_frame_with_f_set(self):
         """
@@ -1866,64 +1746,59 @@
         followed by an I-frame with the Final bit set that indicates the same I-frames should be retransmitted.
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # FIXME: Order shouldn't matter here
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
+        # FIXME: Order shouldn't matter here
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
 
-            dcid = self.scid_to_dcid[scid]
+        dcid = self.scid_to_dcid[scid]
 
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            self.dut.l2cap.SendDynamicChannelPacket(
-                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
-                timeout=timedelta(0.5))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
-                timeout=timedelta(0.5))
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL,
-                timeout=timedelta(2))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        self.dut.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
+            timeout=timedelta(0.5))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
+            timeout=timedelta(0.5))
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL,
+            timeout=timedelta(2))
 
-            # Send SREJ with F not set
-            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-                dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
-                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
-            self.cert_send_b_frame(s_frame)
+        # Send SREJ with F not set
+        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
+            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
+        self.cert_send_b_frame(s_frame)
 
-            cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
+        cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
 
-            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-                dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
-                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
-                SAMPLE_PACKET)
-            self.cert_send_b_frame(i_frame)
+        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+            dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
+            l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+        self.cert_send_b_frame(i_frame)
 
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
-            )
-            cert_acl_data_stream.assert_event_occurs(
-                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1
-            )
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
+        cert_acl_data_stream.assert_event_occurs(
+            lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1)
 
     def test_initiated_configuration_request_ertm(self):
         """
@@ -1932,27 +1807,25 @@
         Enhanced Retransmission Mode.
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            self.on_connection_response = self._on_connection_response_use_ertm
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        self.on_connection_response = self._on_connection_response_use_ertm
 
-            psm = 0x33
-            scid = 0x41
-            self._open_channel(
-                cert_acl_data_stream,
-                1,
-                cert_acl_handle,
-                scid,
-                psm,
-                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+        psm = 0x33
+        scid = 0x41
+        self._open_channel(
+            cert_acl_data_stream,
+            1,
+            cert_acl_handle,
+            scid,
+            psm,
+            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-            # TODO: Fix this test. It doesn't work so far with PDL struct
+        # TODO: Fix this test. It doesn't work so far with PDL struct
 
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_request)
-            asserts.skip("Struct not working")
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_request)
+        asserts.skip("Struct not working")
 
     def test_respond_configuration_request_ertm(self):
         """
@@ -1961,22 +1834,19 @@
         that specifies Enhanced Retransmission Mode.
         """
         cert_acl_handle = self._setup_link_from_cert()
-        with EventStream(
-                self.cert.hci_acl_manager.FetchAclData(
-                    empty_proto.Empty())) as cert_acl_data_stream:
-            cert_acl_data_stream.register_callback(self._handle_control_packet)
-            psm = 1
-            scid = 0x0101
-            self.retransmission_mode = l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM
-            self.dut.l2cap.SetDynamicChannel(
-                l2cap_facade_pb2.SetEnableDynamicChannelRequest(
-                    psm=psm, retransmission_mode=self.retransmission_mode))
+        cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+        cert_acl_data_stream.register_callback(self._handle_control_packet)
+        psm = 1
+        scid = 0x0101
+        self.retransmission_mode = l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM
+        self.dut.l2cap.SetDynamicChannel(
+            l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+                psm=psm, retransmission_mode=self.retransmission_mode))
 
-            open_channel = l2cap_packets.ConnectionRequestBuilder(1, psm, scid)
-            open_channel_l2cap = l2cap_packets.BasicFrameBuilder(
-                1, open_channel)
-            self.cert_send_b_frame(open_channel_l2cap)
+        open_channel = l2cap_packets.ConnectionRequestBuilder(1, psm, scid)
+        open_channel_l2cap = l2cap_packets.BasicFrameBuilder(1, open_channel)
+        self.cert_send_b_frame(open_channel_l2cap)
 
-            # TODO: Verify that the type should be ERTM
-            cert_acl_data_stream.assert_event_occurs(
-                self.is_correct_configuration_response)
+        # TODO: Verify that the type should be ERTM
+        cert_acl_data_stream.assert_event_occurs(
+            self.is_correct_configuration_response)