| # |
| # Copyright 2019 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import time |
| from datetime import timedelta |
| from mobly import asserts |
| |
| from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass |
| from cert.event_stream import EventStream |
| from cert.truth import assertThat |
| from cert.closable import safeClose |
| from cert.py_l2cap import PyL2cap |
| from cert.py_acl_manager import PyAclManager |
| from cert.matchers import L2capMatchers |
| from facade import common_pb2 |
| from facade import rootservice_pb2 as facade_rootservice |
| from google.protobuf import empty_pb2 as empty_proto |
| from l2cap.classic import facade_pb2 as l2cap_facade_pb2 |
| from neighbor.facade import facade_pb2 as neighbor_facade |
| from hci.facade import acl_manager_facade_pb2 as acl_manager_facade |
| import bluetooth_packets_python3 as bt_packets |
| from bluetooth_packets_python3 import hci_packets, l2cap_packets |
| from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly |
| from bluetooth_packets_python3.l2cap_packets import Final |
| from bluetooth_packets_python3.l2cap_packets import CommandCode |
| from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction |
| from bluetooth_packets_python3.l2cap_packets import Poll |
| from cert_l2cap import CertL2cap |
| |
| # Assemble a sample packet. TODO: Use RawBuilder |
| SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1) |
| |
| |
| class L2capTest(GdFacadeOnlyBaseTestClass): |
| |
| def setup_class(self): |
| super().setup_class(dut_module='L2CAP', cert_module='HCI_INTERFACES') |
| |
| def setup_test(self): |
| super().setup_test() |
| |
| self.dut.address = self.dut.hci_controller.GetMacAddress( |
| empty_proto.Empty()).address |
| cert_address = self.cert.controller_read_only_property.ReadLocalAddress( |
| empty_proto.Empty()).address |
| self.cert.address = cert_address |
| self.dut_address = common_pb2.BluetoothAddress(address=self.dut.address) |
| self.cert_address = common_pb2.BluetoothAddress( |
| address=self.cert.address) |
| |
| self.dut.neighbor.EnablePageScan( |
| neighbor_facade.EnableMsg(enabled=True)) |
| |
| self.dut_l2cap = PyL2cap(self.dut) |
| self.cert_l2cap = CertL2cap(self.cert) |
| self.cert_acl = None |
| |
| def teardown_test(self): |
| self.cert_l2cap.close() |
| super().teardown_test() |
| |
| def cert_send_b_frame(self, b_frame): |
| self.cert_acl.send(b_frame.Serialize()) |
| |
| def _setup_link_from_cert(self): |
| |
| self.dut.neighbor.EnablePageScan( |
| neighbor_facade.EnableMsg(enabled=True)) |
| self.cert_l2cap.connect_acl(self.dut.address) |
| self.cert_acl = self.cert_l2cap.get_acl() |
| |
| def _open_unvalidated_channel(self, |
| signal_id=1, |
| scid=0x0101, |
| psm=0x33, |
| use_ertm=False): |
| |
| mode = l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC |
| if use_ertm: |
| mode = l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM |
| |
| dut_channel = self.dut_l2cap.open_channel(psm, mode) |
| cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid) |
| |
| return (dut_channel, cert_channel) |
| |
| def _open_channel(self, signal_id=1, scid=0x0101, psm=0x33, use_ertm=False): |
| result = self._open_unvalidated_channel(signal_id, scid, psm, use_ertm) |
| |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.ConfigurationResponse(), |
| L2capMatchers.ConfigurationRequest()).inAnyOrder() |
| |
| return result |
| |
| def test_connect_dynamic_channel_and_send_data(self): |
| self._setup_link_from_cert() |
| |
| (dut_channel, cert_channel) = self._open_channel(scid=0x41, psm=0x33) |
| |
| dut_channel.send(b'abc') |
| assertThat(cert_channel).emits(L2capMatchers.Data(b'abc')) |
| |
| def test_fixed_channel(self): |
| self._setup_link_from_cert() |
| |
| self.dut.l2cap.RegisterChannel( |
| l2cap_facade_pb2.RegisterChannelRequest(channel=2)) |
| asserts.skip("FIXME: Not working") |
| self.dut.l2cap.SendL2capPacket( |
| l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123")) |
| |
| assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b'123')) |
| |
| def test_receive_packet_from_unknown_channel(self): |
| self._setup_link_from_cert() |
| |
| (dut_channel, cert_channel) = self._open_channel(scid=0x41, psm=0x33) |
| |
| i_frame = l2cap_packets.EnhancedInformationFrameBuilder( |
| 0x99, 0, Final.NOT_SET, 1, |
| l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) |
| self.cert_l2cap.send_acl(i_frame) |
| assertThat(cert_channel).emitsNone( |
| L2capMatchers.SFrame(req_seq=4), timeout=timedelta(seconds=1)) |
| |
| def test_open_two_channels(self): |
| self._setup_link_from_cert() |
| |
| self._open_channel(signal_id=1, scid=0x41, psm=0x41) |
| self._open_channel(signal_id=2, scid=0x43, psm=0x43) |
| |
| def test_connect_and_send_data_ertm_no_segmentation(self): |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc' * 34) |
| assertThat(cert_channel).emits( |
| L2capMatchers.IFrame(tx_seq=0, payload=b'abc' * 34)) |
| |
| cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) |
| # todo verify received? |
| |
| def test_basic_operation_request_connection(self): |
| """ |
| L2CAP/COS/CED/BV-01-C [Request Connection] |
| Verify that the IUT is able to request the connection establishment for an L2CAP data channel and |
| initiate the configuration procedure. |
| """ |
| self._setup_link_from_cert() |
| |
| psm = 0x33 |
| # TODO: Use another test case |
| self.dut.l2cap.OpenChannel( |
| l2cap_facade_pb2.OpenChannelRequest( |
| remote=self.cert_address, psm=psm)) |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.ConnectionRequest()) |
| |
| def test_accept_disconnect(self): |
| """ |
| L2CAP/COS/CED/BV-07-C |
| """ |
| self._setup_link_from_cert() |
| |
| scid = 0x41 |
| psm = 0x33 |
| self._open_channel(scid=scid, psm=0x33) |
| |
| dcid = self.cert_l2cap.get_dcid(scid) |
| |
| close_channel = l2cap_packets.DisconnectionRequestBuilder(1, dcid, scid) |
| self.cert_l2cap.get_control_channel().send(close_channel) |
| |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.DisconnectionResponse(scid, dcid)) |
| |
| def test_disconnect_on_timeout(self): |
| """ |
| L2CAP/COS/CED/BV-08-C |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.ignore_config_and_connections() |
| |
| self._open_unvalidated_channel(scid=0x41, psm=0x33) |
| |
| assertThat(self.cert_l2cap.get_control_channel()).emitsNone( |
| L2capMatchers.ConfigurationResponse()) |
| |
| def test_retry_config_after_rejection(self): |
| """ |
| L2CAP/COS/CFD/BV-02-C |
| """ |
| self._setup_link_from_cert() |
| |
| self.cert_l2cap.reply_with_unacceptable_parameters() |
| |
| self._open_unvalidated_channel(scid=0x41, psm=0x33) |
| |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.ConfigurationResponse()) |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.ConfigurationRequest(), at_least_times=2) |
| |
| def test_config_unknown_options_with_hint(self): |
| """ |
| L2CAP/COS/CFD/BV-12-C |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.reply_with_unknown_options_and_hint() |
| |
| self._open_unvalidated_channel(scid=0x41, psm=0x33) |
| |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.ConfigurationResponse()) |
| |
| def test_respond_to_echo_request(self): |
| """ |
| L2CAP/COS/ECH/BV-01-C [Respond to Echo Request] |
| Verify that the IUT responds to an echo request. |
| """ |
| self._setup_link_from_cert() |
| asserts.skip("is echo without a channel supported?") |
| |
| echo_request = l2cap_packets.EchoRequestBuilder( |
| 100, l2cap_packets.DisconnectionRequestBuilder(1, 2, 3)) |
| echo_request_l2cap = l2cap_packets.BasicFrameBuilder(1, echo_request) |
| self.cert_send_b_frame(echo_request_l2cap) |
| |
| assertThat(self.cert_channel).emits( |
| L2capMatchers.PartialData(b"\x06\x01\x04\x00\x02\x00\x03\x00")) |
| |
| def test_reject_unknown_command(self): |
| """ |
| L2CAP/COS/CED/BI-01-C |
| """ |
| self._setup_link_from_cert() |
| |
| asserts.skip("need to use packet builders") |
| invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00" |
| self.cert_l2cap.get_control_channel().send(invalid_command_packet) |
| |
| assertThat(self.cert_channel).emits(L2capMatchers.CommandReject()) |
| |
| def test_query_for_1_2_features(self): |
| """ |
| L2CAP/COS/IEX/BV-01-C [Query for 1.2 Features] |
| """ |
| self._setup_link_from_cert() |
| signal_id = 3 |
| information_request = l2cap_packets.InformationRequestBuilder( |
| signal_id, |
| l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED |
| ) |
| information_request_l2cap = l2cap_packets.BasicFrameBuilder( |
| 1, information_request) |
| self.cert_send_b_frame(information_request_l2cap) |
| |
| def is_correct_information_response(l2cap_packet): |
| l2cap_control_view = l2cap_packets.ControlView( |
| l2cap_packet.GetPayload()) |
| if l2cap_control_view.GetCode( |
| ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: |
| return False |
| information_response_view = l2cap_packets.InformationResponseView( |
| l2cap_control_view) |
| return information_response_view.GetInfoType( |
| ) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED |
| |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| is_correct_information_response) |
| |
| def test_extended_feature_info_response_ertm(self): |
| """ |
| L2CAP/EXF/BV-01-C [Extended Features Information Response for Enhanced |
| Retransmission Mode] |
| """ |
| self._setup_link_from_cert() |
| |
| signal_id = 3 |
| information_request = l2cap_packets.InformationRequestBuilder( |
| signal_id, |
| l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED |
| ) |
| information_request_l2cap = l2cap_packets.BasicFrameBuilder( |
| 1, information_request) |
| self.cert_send_b_frame(information_request_l2cap) |
| |
| def is_correct_information_response(l2cap_packet): |
| l2cap_control_view = l2cap_packets.ControlView( |
| l2cap_packet.GetPayload()) |
| if l2cap_control_view.GetCode( |
| ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: |
| return False |
| information_response_view = l2cap_packets.InformationResponseView( |
| l2cap_control_view) |
| if information_response_view.GetInfoType( |
| ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: |
| return False |
| extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( |
| information_response_view) |
| return extended_features_view.GetEnhancedRetransmissionMode() |
| |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| is_correct_information_response) |
| |
| def test_extended_feature_info_response_streaming(self): |
| """ |
| L2CAP/EXF/BV-02-C |
| """ |
| asserts.skip("Streaming not supported") |
| self._setup_link_from_cert() |
| |
| signal_id = 3 |
| information_request = l2cap_packets.InformationRequestBuilder( |
| signal_id, |
| l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED |
| ) |
| information_request_l2cap = l2cap_packets.BasicFrameBuilder( |
| 1, information_request) |
| self.cert_send_b_frame(information_request_l2cap) |
| |
| def is_correct_information_response(l2cap_packet): |
| packet_bytes = l2cap_packet.payload |
| l2cap_view = l2cap_packets.BasicFrameView( |
| bt_packets.PacketViewLittleEndian(list(packet_bytes))) |
| if l2cap_view.GetChannelId() != 1: |
| return False |
| l2cap_control_view = l2cap_packets.ControlView( |
| l2cap_view.GetPayload()) |
| if l2cap_control_view.GetCode( |
| ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: |
| return False |
| information_response_view = l2cap_packets.InformationResponseView( |
| l2cap_control_view) |
| if information_response_view.GetInfoType( |
| ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: |
| return False |
| extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( |
| information_response_view) |
| return extended_features_view.GetStreamingMode() |
| |
| assertThat(self.cert_acl).emits(is_correct_information_response) |
| |
| def test_extended_feature_info_response_fcs(self): |
| """ |
| L2CAP/EXF/BV-03-C [Extended Features Information Response for FCS Option] |
| Note: This is not mandated by L2CAP Spec |
| """ |
| self._setup_link_from_cert() |
| |
| signal_id = 3 |
| information_request = l2cap_packets.InformationRequestBuilder( |
| signal_id, |
| l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED |
| ) |
| information_request_l2cap = l2cap_packets.BasicFrameBuilder( |
| 1, information_request) |
| self.cert_send_b_frame(information_request_l2cap) |
| |
| def is_correct_information_response(l2cap_packet): |
| l2cap_control_view = l2cap_packets.ControlView( |
| l2cap_packet.GetPayload()) |
| if l2cap_control_view.GetCode( |
| ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: |
| return False |
| information_response_view = l2cap_packets.InformationResponseView( |
| l2cap_control_view) |
| if information_response_view.GetInfoType( |
| ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: |
| return False |
| extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( |
| information_response_view) |
| return extended_features_view.GetFcsOption() |
| |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| is_correct_information_response) |
| |
| def test_extended_feature_info_response_fixed_channels(self): |
| """ |
| L2CAP/EXF/BV-05-C |
| """ |
| self._setup_link_from_cert() |
| |
| signal_id = 3 |
| information_request = l2cap_packets.InformationRequestBuilder( |
| signal_id, |
| l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED |
| ) |
| information_request_l2cap = l2cap_packets.BasicFrameBuilder( |
| 1, information_request) |
| self.cert_send_b_frame(information_request_l2cap) |
| |
| def is_correct_information_response(l2cap_packet): |
| packet_bytes = l2cap_packet.payload |
| l2cap_view = l2cap_packets.BasicFrameView( |
| bt_packets.PacketViewLittleEndian(list(packet_bytes))) |
| if l2cap_view.GetChannelId() != 1: |
| return False |
| l2cap_control_view = l2cap_packets.ControlView( |
| l2cap_view.GetPayload()) |
| if l2cap_control_view.GetCode( |
| ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: |
| return False |
| information_response_view = l2cap_packets.InformationResponseView( |
| l2cap_control_view) |
| if information_response_view.GetInfoType( |
| ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: |
| return False |
| extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( |
| information_response_view) |
| return extended_features_view.GetFixedChannels() |
| |
| assertThat(self.cert_acl).emits(is_correct_information_response) |
| |
| def test_config_channel_not_use_FCS(self): |
| """ |
| L2CAP/FOC/BV-01-C [IUT Initiated Configuration of the FCS Option] |
| Verify the IUT can configure a channel to not use FCS in I/S-frames. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| assertThat(cert_channel).emits( |
| L2capMatchers.IFrame(tx_seq=0, payload=b'abc')) |
| |
| def test_explicitly_request_use_FCS(self): |
| """ |
| L2CAP/FOC/BV-02-C [Lower Tester Explicitly Requests FCS should be Used] |
| Verify the IUT will include the FCS in I/S-frames if the Lower Tester explicitly requests that FCS |
| should be used. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| self.cert_l2cap.turn_on_fcs() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| assertThat(cert_channel).emits( |
| L2capMatchers.PartialData( |
| b"abc\x4f\xa3")) # TODO: Use packet parser |
| |
| def test_implicitly_request_use_FCS(self): |
| """ |
| L2CAP/FOC/BV-03-C [Lower Tester Implicitly Requests FCS should be Used] |
| TODO: Update this test case. What's the difference between this one and test_explicitly_request_use_FCS? |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| self.cert_l2cap.turn_on_fcs() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| assertThat(cert_channel).emits( |
| L2capMatchers.PartialData( |
| b"abc\x4f\xa3")) # TODO: Use packet parser |
| |
| def test_transmit_i_frames(self): |
| """ |
| L2CAP/ERM/BV-01-C [Transmit I-frames] |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| assertThat(cert_channel).emits( |
| L2capMatchers.IFrame(tx_seq=0, payload=b"abc")) |
| |
| # Assemble a sample packet. TODO: Use RawBuilder |
| SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1) |
| |
| # todo: verify packet received? |
| cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) |
| |
| dut_channel.send(b'abc') |
| assertThat(cert_channel).emits( |
| L2capMatchers.IFrame(tx_seq=1, payload=b"abc")) |
| |
| cert_channel.send_i_frame(tx_seq=1, req_seq=2, payload=SAMPLE_PACKET) |
| |
| dut_channel.send(b'abc') |
| assertThat(cert_channel).emits(L2capMatchers.PartialData(b"abc")) |
| |
| cert_channel.send_i_frame(tx_seq=2, req_seq=3, payload=SAMPLE_PACKET) |
| |
| def test_receive_i_frames(self): |
| """ |
| L2CAP/ERM/BV-02-C [Receive I-Frames] |
| Verify the IUT can receive in-sequence valid I-frames and deliver L2CAP SDUs to the Upper Tester |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| for i in range(3): |
| cert_channel.send_i_frame( |
| tx_seq=i, req_seq=0, payload=SAMPLE_PACKET) |
| assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=i + 1)) |
| |
| cert_channel.send_i_frame( |
| tx_seq=3, |
| req_seq=0, |
| sar=SegmentationAndReassembly.START, |
| payload=SAMPLE_PACKET) |
| assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=4)) |
| |
| cert_channel.send_i_frame( |
| tx_seq=4, |
| req_seq=0, |
| sar=SegmentationAndReassembly.CONTINUATION, |
| payload=SAMPLE_PACKET) |
| assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=5)) |
| |
| cert_channel.send_i_frame( |
| tx_seq=5, |
| req_seq=0, |
| sar=SegmentationAndReassembly.END, |
| payload=SAMPLE_PACKET) |
| assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=6)) |
| |
| def test_acknowledging_received_i_frames(self): |
| """ |
| L2CAP/ERM/BV-03-C [Acknowledging Received I-Frames] |
| Verify the IUT sends S-frame [RR] with the Poll bit not set to acknowledge data received from the |
| Lower Tester |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| for i in range(3): |
| cert_channel.send_i_frame( |
| tx_seq=i, req_seq=0, payload=SAMPLE_PACKET) |
| assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=i + 1)) |
| |
| assertThat(cert_channel).emitsNone( |
| L2capMatchers.SFrame(req_seq=4), timeout=timedelta(seconds=1)) |
| |
| def test_resume_transmitting_when_received_rr(self): |
| """ |
| L2CAP/ERM/BV-05-C [Resume Transmitting I-Frames when an S-Frame [RR] is Received] |
| Verify the IUT will cease transmission of I-frames when the negotiated TxWindow is full. Verify the |
| IUT will resume transmission of I-frames when an S-frame [RR] is received that acknowledges |
| previously sent I-frames. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm(tx_window_size=1) |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| dut_channel.send(b'def') |
| |
| assertThat(cert_channel).emits( |
| L2capMatchers.IFrame(tx_seq=0, payload=b'abc')) |
| assertThat(cert_channel).emitsNone( |
| L2capMatchers.IFrame(tx_seq=1, payload=b'def')) |
| |
| cert_channel.send_s_frame(req_seq=1, f=Final.POLL_RESPONSE) |
| assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1)) |
| |
| def test_resume_transmitting_when_acknowledge_previously_sent(self): |
| """ |
| L2CAP/ERM/BV-06-C [Resume Transmitting I-Frames when an I-Frame is Received] |
| Verify the IUT will cease transmission of I-frames when the negotiated TxWindow is full. Verify the |
| IUT will resume transmission of I-frames when an I-frame is received that acknowledges previously |
| sent I-frames. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm(tx_window_size=1) |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| dut_channel.send(b'def') |
| |
| assertThat(cert_channel).emits( |
| L2capMatchers.IFrame(tx_seq=0, payload=b'abc')) |
| # TODO: If 1 second is greater than their retransmit timeout, use a smaller timeout |
| assertThat(cert_channel).emitsNone( |
| L2capMatchers.IFrame(tx_seq=1, payload=b'abc'), |
| timeout=timedelta(seconds=1)) |
| |
| cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) |
| |
| assertThat(cert_channel).emits( |
| L2capMatchers.IFrame(tx_seq=1, payload=b'def')) |
| |
| cert_channel.send_i_frame(tx_seq=1, req_seq=2, payload=SAMPLE_PACKET) |
| |
| def test_transmit_s_frame_rr_with_poll_bit_set(self): |
| """ |
| L2CAP/ERM/BV-08-C [Send S-Frame [RR] with Poll Bit Set] |
| Verify the IUT sends an S-frame [RR] with the Poll bit set when its retransmission timer expires. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| # TODO: Always use their retransmission timeout value |
| time.sleep(2) |
| assertThat(cert_channel).emits( |
| L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL)) |
| |
| def test_transmit_s_frame_rr_with_final_bit_set(self): |
| """ |
| L2CAP/ERM/BV-09-C [Send S-Frame [RR] with Final Bit Set] |
| Verify the IUT responds with an S-frame [RR] with the Final bit set after receiving an S-frame [RR] |
| with the Poll bit set. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| cert_channel.send_s_frame(req_seq=0, p=Poll.POLL) |
| assertThat(cert_channel).emits( |
| L2capMatchers.SFrame(f=Final.POLL_RESPONSE)) |
| |
| def test_s_frame_transmissions_exceed_max_transmit(self): |
| """ |
| L2CAP/ERM/BV-11-C [S-Frame Transmissions Exceed MaxTransmit] |
| Verify the IUT will close the channel when the Monitor Timer expires. |
| """ |
| asserts.skip("Need to configure DUT to have a shorter timer") |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| |
| # Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362 |
| time.sleep(362) |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.DisconnectionRequest()) |
| |
| def test_i_frame_transmissions_exceed_max_transmit(self): |
| """ |
| L2CAP/ERM/BV-12-C [I-Frame Transmissions Exceed MaxTransmit] |
| Verify the IUT will close the channel when it receives an S-frame [RR] with the final bit set that does |
| not acknowledge the previous I-frame sent by the IUT. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0)) |
| |
| cert_channel.send_s_frame(req_seq=0, f=Final.POLL_RESPONSE) |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.DisconnectionRequest()) |
| |
| def test_respond_to_rej(self): |
| """ |
| L2CAP/ERM/BV-13-C [Respond to S-Frame [REJ]] |
| Verify the IUT retransmits I-frames starting from the sequence number specified in the S-frame [REJ]. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm(tx_window_size=2, max_transmit=2) |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| dut_channel.send(b'abc') |
| assertThat(cert_channel).emits( |
| L2capMatchers.IFrame(tx_seq=0, payload=b'abc'), |
| L2capMatchers.IFrame(tx_seq=1, payload=b'abc')).inOrder() |
| |
| cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.REJECT) |
| |
| assertThat(cert_channel).emits( |
| L2capMatchers.IFrame(tx_seq=0, payload=b'abc'), |
| L2capMatchers.IFrame(tx_seq=1, payload=b'abc')).inOrder() |
| |
| def test_receive_s_frame_rr_final_bit_set(self): |
| """ |
| L2CAP/ERM/BV-18-C [Receive S-Frame [RR] Final Bit = 1] |
| Verify the IUT will retransmit any previously sent I-frames unacknowledged by receipt of an S-Frame |
| [RR] with the Final Bit set. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| |
| # TODO: Always use their retransmission timeout value |
| time.sleep(2) |
| assertThat(cert_channel).emits( |
| L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL)) |
| |
| cert_channel.send_s_frame(req_seq=0, f=Final.POLL_RESPONSE) |
| assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0)) |
| |
| def test_receive_i_frame_final_bit_set(self): |
| """ |
| L2CAP/ERM/BV-19-C [Receive I-Frame Final Bit = 1] |
| Verify the IUT will retransmit any previously sent I-frames unacknowledged by receipt of an I-frame |
| with the final bit set. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| |
| # TODO: Always use their retransmission timeout value |
| time.sleep(2) |
| assertThat(cert_channel).emits(L2capMatchers.SFrame(p=Poll.POLL)) |
| |
| cert_channel.send_i_frame( |
| tx_seq=0, req_seq=0, f=Final.POLL_RESPONSE, payload=SAMPLE_PACKET) |
| |
| assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0)) |
| |
| def test_recieve_rnr(self): |
| """ |
| L2CAP/ERM/BV-20-C [Enter Remote Busy Condition] |
| Verify the IUT will not retransmit any I-frames when it receives a remote busy indication from the |
| Lower Tester (S-frame [RNR]). |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| |
| # TODO: Always use their retransmission timeout value |
| time.sleep(2) |
| assertThat(cert_channel).emits( |
| L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL)) |
| |
| cert_channel.send_s_frame( |
| req_seq=0, |
| s=SupervisoryFunction.RECEIVER_NOT_READY, |
| f=Final.POLL_RESPONSE) |
| assertThat(cert_channel).emitsNone(L2capMatchers.IFrame(tx_seq=0)) |
| |
| def test_sent_rej_lost(self): |
| """ |
| L2CAP/ERM/BI-01-C [S-Frame [REJ] Lost or Corrupted] |
| Verify the IUT can handle receipt of an S-=frame [RR] Poll = 1 if the S-frame [REJ] sent from the IUT |
| is lost. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm(tx_window_size=5) |
| ertm_tx_window_size = 5 |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x41, use_ertm=True) |
| |
| cert_channel.send_i_frame(tx_seq=0, req_seq=0, payload=SAMPLE_PACKET) |
| assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=1)) |
| |
| cert_channel.send_i_frame( |
| tx_seq=ertm_tx_window_size - 1, req_seq=0, payload=SAMPLE_PACKET) |
| assertThat(cert_channel).emits( |
| L2capMatchers.SFrame(s=SupervisoryFunction.REJECT)) |
| |
| cert_channel.send_s_frame(req_seq=0, p=Poll.POLL) |
| |
| assertThat(cert_channel).emits( |
| L2capMatchers.SFrame( |
| req_seq=1, f=l2cap_packets.Final.POLL_RESPONSE)) |
| for i in range(1, ertm_tx_window_size): |
| cert_channel.send_i_frame( |
| tx_seq=i, req_seq=0, payload=SAMPLE_PACKET) |
| assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=i + 1)) |
| |
| def test_handle_duplicate_srej(self): |
| """ |
| L2CAP/ERM/BI-03-C [Handle Duplicate S-Frame [SREJ]] |
| Verify the IUT will only retransmit the requested I-frame once after receiving a duplicate SREJ. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| dut_channel.send(b'abc') |
| assertThat(cert_channel).emits( |
| L2capMatchers.IFrame(tx_seq=0), |
| L2capMatchers.IFrame(tx_seq=1), |
| L2capMatchers.SFrame(p=Poll.POLL)).inOrder() |
| |
| cert_channel.send_s_frame( |
| req_seq=0, s=SupervisoryFunction.SELECT_REJECT) |
| assertThat(cert_channel).emitsNone(timeout=timedelta(seconds=0.5)) |
| |
| cert_channel.send_s_frame( |
| req_seq=0, |
| s=SupervisoryFunction.SELECT_REJECT, |
| f=Final.POLL_RESPONSE) |
| assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0)) |
| |
| def test_handle_receipt_rej_and_rr_with_f_set(self): |
| """ |
| L2CAP/ERM/BI-04-C [Handle Receipt of S-Frame [REJ] and S-Frame [RR, F=1] that Both Require Retransmission of the Same I-Frames] |
| Verify the IUT will only retransmit the requested I-frames once after receiving an S-frame [REJ] |
| followed by an S-frame [RR] with the Final bit set that indicates the same I-frames should be |
| retransmitted. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| dut_channel.send(b'abc') |
| assertThat(cert_channel).emits( |
| L2capMatchers.IFrame(tx_seq=0), |
| L2capMatchers.IFrame(tx_seq=1), |
| L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL)).inOrder() |
| |
| cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.REJECT) |
| assertThat(cert_channel).emitsNone(timeout=timedelta(seconds=0.5)) |
| |
| # Send RR with F set |
| cert_channel.send_s_frame( |
| req_seq=0, s=SupervisoryFunction.REJECT, f=Final.POLL_RESPONSE) |
| assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0)) |
| assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1)) |
| |
| def test_handle_rej_and_i_frame_with_f_set(self): |
| """ |
| L2CAP/ERM/BI-05-C [Handle receipt of S-Frame [REJ] and I-Frame [F=1] that Both Require Retransmission of the Same I-Frames] |
| Verify the IUT will only retransmit the requested I-frames once after receiving an S-frame [REJ] |
| followed by an I-frame with the Final bit set that indicates the same I-frames should be retransmitted. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| (dut_channel, cert_channel) = self._open_channel( |
| scid=0x41, psm=0x33, use_ertm=True) |
| |
| dut_channel.send(b'abc') |
| dut_channel.send(b'abc') |
| assertThat(cert_channel).emits( |
| L2capMatchers.IFrame(tx_seq=0), |
| L2capMatchers.IFrame(tx_seq=1), |
| L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL)).inOrder() |
| |
| # Send SREJ with F not set |
| cert_channel.send_s_frame( |
| req_seq=0, s=SupervisoryFunction.SELECT_REJECT) |
| assertThat(cert_channel).emitsNone(timeout=timedelta(seconds=0.5)) |
| |
| cert_channel.send_i_frame( |
| tx_seq=0, req_seq=0, f=Final.POLL_RESPONSE, payload=SAMPLE_PACKET) |
| |
| assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0)) |
| assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1)) |
| |
| def test_initiated_configuration_request_ertm(self): |
| """ |
| L2CAP/CMC/BV-01-C [IUT Initiated Configuration of Enhanced Retransmission Mode] |
| Verify the IUT can send a Configuration Request command containing the F&EC option that specifies |
| Enhanced Retransmission Mode. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.turn_on_ertm() |
| |
| self._open_unvalidated_channel(scid=0x41, psm=0x33, use_ertm=True) |
| |
| # TODO: Fix this test. It doesn't work so far with PDL struct |
| |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.ConfigurationRequest()) |
| asserts.skip("Struct not working") |
| |
| def test_respond_configuration_request_ertm(self): |
| """ |
| L2CAP/CMC/BV-02-C [Lower Tester Initiated Configuration of Enhanced Retransmission Mode] |
| Verify the IUT can accept a Configuration Request from the Lower Tester containing an F&EC option |
| that specifies Enhanced Retransmission Mode. |
| """ |
| self._setup_link_from_cert() |
| |
| psm = 1 |
| scid = 0x0101 |
| self.retransmission_mode = l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM |
| self.dut.l2cap.SetDynamicChannel( |
| l2cap_facade_pb2.SetEnableDynamicChannelRequest( |
| psm=psm, retransmission_mode=self.retransmission_mode)) |
| |
| open_channel = l2cap_packets.ConnectionRequestBuilder(1, psm, scid) |
| open_channel_l2cap = l2cap_packets.BasicFrameBuilder(1, open_channel) |
| self.cert_send_b_frame(open_channel_l2cap) |
| |
| # TODO: Verify that the type should be ERTM |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.ConfigurationResponse()) |