| # |
| # Copyright 2020 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from cert.truth import assertThat |
| from cert.py_l2cap import PyLeL2cap |
| from cert.matchers import L2capMatchers |
| from cert.metadata import metadata |
| from facade import common_pb2 as common |
| from google.protobuf import empty_pb2 as empty_proto |
| from hci.facade import le_acl_manager_facade_pb2 as le_acl_manager_facade |
| from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade |
| from hci.facade import le_initiator_address_facade_pb2 as le_initiator_address_facade |
| import bluetooth_packets_python3 as bt_packets |
| from bluetooth_packets_python3 import hci_packets, l2cap_packets |
| from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult |
| from l2cap.le.cert.cert_le_l2cap import CertLeL2cap |
| from l2cap.le.facade_pb2 import SecurityLevel |
| |
| # Assemble a sample packet. |
| SAMPLE_PACKET = bt_packets.RawBuilder([0x19, 0x26, 0x08, 0x17]) |
| |
| |
| class LeL2capTestBase(): |
| |
| def setup_class(self): |
| GdBaseTestClass.setup_class(self, dut_module='L2CAP', cert_module='HCI_INTERFACES') |
| |
| def setup_test(self, dut, cert): |
| self.dut = dut |
| self.cert = cert |
| |
| self.dut_l2cap = PyLeL2cap(self.dut) |
| self.cert_l2cap = CertLeL2cap(self.cert) |
| self.dut_address = common.BluetoothAddressWithType( |
| address=common.BluetoothAddress(address=bytes(b'D0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS) |
| self.cert_address = common.BluetoothAddressWithType( |
| address=common.BluetoothAddress(address=bytes(b'C0:11:FF:AA:33:22')), type=common.RANDOM_DEVICE_ADDRESS) |
| dut_privacy_policy = le_initiator_address_facade.PrivacyPolicy( |
| address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, |
| address_with_type=self.dut_address, |
| rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', |
| minimum_rotation_time=0, |
| maximum_rotation_time=0) |
| self.dut_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(dut_privacy_policy) |
| privacy_policy = le_initiator_address_facade.PrivacyPolicy( |
| address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, |
| address_with_type=self.cert_address, |
| rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', |
| minimum_rotation_time=0, |
| maximum_rotation_time=0) |
| self.cert_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) |
| |
| def teardown_test(self): |
| self.cert_l2cap.close() |
| self.dut_l2cap.close() |
| |
| def _setup_link_from_cert(self): |
| # DUT Advertises |
| gap_name = hci_packets.GapData() |
| gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME |
| gap_name.data = list(bytes(b'Im_The_DUT')) |
| gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) |
| config = le_advertising_facade.AdvertisingConfig( |
| advertisement=[gap_data], |
| interval_min=512, |
| interval_max=768, |
| advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, |
| own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, |
| channel_map=7, |
| filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) |
| request = le_advertising_facade.CreateAdvertiserRequest(config=config) |
| create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request) |
| self.cert_l2cap.connect_le_acl(self.dut_address) |
| |
| def _set_link_from_dut_and_open_channel(self, |
| signal_id=1, |
| scid=0x0101, |
| psm=0x33, |
| mtu=1000, |
| mps=100, |
| initial_credit=6): |
| # Cert Advertises |
| gap_name = hci_packets.GapData() |
| gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME |
| gap_name.data = list(bytes(b'Im_The_DUT')) |
| gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) |
| config = le_advertising_facade.AdvertisingConfig( |
| advertisement=[gap_data], |
| interval_min=512, |
| interval_max=768, |
| advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, |
| own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, |
| channel_map=7, |
| filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) |
| request = le_advertising_facade.CreateAdvertiserRequest(config=config) |
| create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request) |
| response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm) |
| self.cert_l2cap.wait_for_connection() |
| # TODO: Currently we can only connect by using Dynamic channel API. Use fixed channel instead. |
| cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(psm) |
| dut_channel = response_future.get_channel() |
| return (dut_channel, cert_channel) |
| |
| def _open_channel_from_cert(self, signal_id=1, scid=0x0101, psm=0x33, mtu=1000, mps=100, initial_credit=6): |
| |
| dut_channel = self.dut_l2cap.register_coc(self.cert_address, psm) |
| cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid, mtu, mps, initial_credit) |
| |
| return (dut_channel, cert_channel) |
| |
| def _open_channel_from_dut(self, psm=0x33): |
| response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm) |
| cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(psm) |
| dut_channel = response_future.get_channel() |
| return (dut_channel, cert_channel) |
| |
| def _open_fixed_channel(self, cid=4): |
| dut_channel = self.dut_l2cap.get_fixed_channel(cid) |
| cert_channel = self.cert_l2cap.open_fixed_channel(cid) |
| return (dut_channel, cert_channel) |
| |
| def test_fixed_channel_send(self): |
| self.dut_l2cap.enable_fixed_channel(4) |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_fixed_channel(4) |
| dut_channel.send(b'hello' * 40) |
| assertThat(cert_channel).emits(L2capMatchers.Data(b'hello' * 40)) |
| |
| def test_fixed_channel_receive(self): |
| self.dut_l2cap.enable_fixed_channel(4) |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_fixed_channel(4) |
| cert_channel.send(SAMPLE_PACKET) |
| assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) |
| |
| def test_connect_from_dut_and_open_dynamic_channel(self): |
| """ |
| Internal test for GD stack only |
| """ |
| self._set_link_from_dut_and_open_channel() |
| |
| @metadata(pts_test_id="L2CAP/LE/CPU/BV-01-C", pts_test_name="Send Connection Parameter Update Request") |
| def test_send_connection_parameter_update_request(self): |
| """ |
| Verify that the IUT is able to send the connection parameter update Request to Lower Tester when acting as a peripheral device. |
| NOTE: This is an optional feature. Also if both LL central and peripheral supports 4.1+ connection parameter update, this should happen in LL only, not L2CAP |
| NOTE: Currently we need to establish at least one dynamic channel to allow update. |
| """ |
| self._setup_link_from_cert() |
| self._open_channel_from_dut() |
| self.dut_l2cap.update_connection_parameter() |
| assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.LeConnectionParameterUpdateRequest()) |
| |
| @metadata(pts_test_id="L2CAP/LE/CPU/BV-02-C", pts_test_name="Accept Connection Parameter Update Request") |
| def test_accept_connection_parameter_update_request(self): |
| """ |
| Verify that the IUT is able to receive and handle a request for connection parameter update when acting as a central device. |
| NOTE: Currently we need to establish at least one dynamic channel to allow update. |
| """ |
| self._set_link_from_dut_and_open_channel() |
| self.cert_l2cap.get_control_channel().send( |
| l2cap_packets.ConnectionParameterUpdateRequestBuilder(2, 0x10, 0x10, 0x0a, 0x64)) |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.LeConnectionParameterUpdateResponse( |
| l2cap_packets.ConnectionParameterUpdateResponseResult.ACCEPTED)) |
| |
| @metadata(pts_test_id="L2CAP/LE/CPU/BI-01-C", pts_test_name="Reject Connection Parameter Update Parameters") |
| def test_reject_connection_parameter_update_parameters(self): |
| """ |
| Verify that the IUT is able to reject a request for connection parameter update with illegal parameters. |
| NOTE: Currently we need to establish at least one dynamic channel to allow update. |
| """ |
| self._set_link_from_dut_and_open_channel() |
| self.cert_l2cap.get_control_channel().send( |
| l2cap_packets.ConnectionParameterUpdateRequestBuilder(2, 0x10, 0x10, 512, 0x64)) |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.LeConnectionParameterUpdateResponse( |
| l2cap_packets.ConnectionParameterUpdateResponseResult.REJECTED)) |
| |
| @metadata(pts_test_id="L2CAP/LE/CPU/BI-02-C", pts_test_name="Reject Connection Parameter Update Request") |
| def test_reject_connection_parameter_update_request(self): |
| """ |
| Verify that the IUT is able to reject a request for connection parameter update in peripheral mode. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.get_control_channel().send( |
| l2cap_packets.ConnectionParameterUpdateRequestBuilder(2, 0x10, 0x10, 0x0a, 0x64)) |
| assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.LeCommandReject()) |
| |
| @metadata(pts_test_id="L2CAP/COS/CFC/BV-01-C", pts_test_name="Segmentation") |
| def test_segmentation(self): |
| """ |
| Verify that the IUT can send data segments which are larger than the LE frame size. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_channel_from_cert(mtu=1000, mps=102) |
| dut_channel.send(b'hello' * 20 + b'world') |
| # The first LeInformation packet contains 2 bytes of SDU size. |
| # The packet is divided into first 100 bytes from 'hellohello....' |
| # and remaining 5 bytes 'world' |
| assertThat(cert_channel).emits( |
| L2capMatchers.FirstLeIFrame(b'hello' * 20, sdu_size=105), L2capMatchers.Data(b'world')).inOrder() |
| |
| @metadata(pts_test_id="L2CAP/COS/CFC/BV-02-C", pts_test_name="No Segmentation") |
| def test_no_segmentation(self): |
| """ |
| Verify that the IUT can send data segments which do not require segmentation. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_channel_from_cert(mtu=1000, mps=202) |
| dut_channel.send(b'hello' * 40) |
| assertThat(cert_channel).emits(L2capMatchers.FirstLeIFrame(b'hello' * 40, sdu_size=200)) |
| |
| def test_no_segmentation_dut_is_central(self): |
| """ |
| L2CAP/COS/CFC/BV-02-C |
| """ |
| (dut_channel, cert_channel) = self._set_link_from_dut_and_open_channel() |
| dut_channel.send(b'hello' * 40) |
| assertThat(cert_channel).emits(L2capMatchers.FirstLeIFrame(b'hello' * 40, sdu_size=200)) |
| |
| @metadata(pts_test_id="L2CAP/COS/CFC/BV-03-C", pts_test_name="Reassembling") |
| def test_reassembling(self): |
| """ |
| Verify that the IUT can correctly reassemble data received from the Lower Tester which is greater than the IUT LE-frame size. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_channel_from_cert() |
| sdu_size_for_two_sample_packet = 8 |
| cert_channel.send_first_le_i_frame(sdu_size_for_two_sample_packet, SAMPLE_PACKET) |
| cert_channel.send(SAMPLE_PACKET) |
| assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17' * 2)) |
| |
| @metadata(pts_test_id="L2CAP/COS/CFC/BV-04-C", pts_test_name="Data Receiving") |
| def test_data_receiving(self): |
| """ |
| Verify that the IUT can receive unsegmented data correctly. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_channel_from_cert() |
| cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) |
| assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) |
| |
| def test_data_receiving_dut_is_central(self): |
| """ |
| L2CAP/COS/CFC/BV-04-C |
| """ |
| (dut_channel, cert_channel) = self._set_link_from_dut_and_open_channel() |
| cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) |
| assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) |
| |
| @metadata(pts_test_id="L2CAP/COS/CFC/BV-05-C", pts_test_name="Multiple Channels with Interleaved Data Streams") |
| def test_multiple_channels_with_interleaved_data_streams(self): |
| """ |
| Verify that an IUT can create multiple channels and receives data streams on the channels when the streams are interleaved. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel_x, cert_channel_x) = self._open_channel_from_cert(signal_id=1, scid=0x0103, psm=0x33) |
| (dut_channel_y, cert_channel_y) = self._open_channel_from_cert(signal_id=2, scid=0x0105, psm=0x35) |
| (dut_channel_z, cert_channel_z) = self._open_channel_from_cert(signal_id=3, scid=0x0107, psm=0x37) |
| cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET) |
| cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET) |
| cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET) |
| cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET) |
| cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET) |
| # TODO: We should assert two events in order, but it got stuck |
| assertThat(dut_channel_y).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'), at_least_times=3) |
| assertThat(dut_channel_z).emits( |
| L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'), |
| L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')).inOrder() |
| cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET) |
| assertThat(dut_channel_z).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) |
| |
| @metadata(pts_test_id="L2CAP/LE/REJ/BI-01-C", pts_test_name="Reject Unknown Command in LE Signaling Channel") |
| def test_reject_unknown_command_in_le_sigling_channel(self): |
| """ |
| Verify that the IUT is able to reject unknown command. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.get_control_channel().send( |
| l2cap_packets.InformationRequestBuilder( |
| 2, l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED)) |
| assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.LeCommandReject()) |
| |
| @metadata(pts_test_id="L2CAP/LE/REJ/BI-02-C", pts_test_name="Command Reject – Reserved PDU Codes") |
| def test_command_reject_reserved_pdu_codes(self): |
| """ |
| Verify that an IUT receiving a PDU with a reserved command code sends a command reject. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.get_control_channel().send(l2cap_packets.MoveChannelRequestBuilder(2, 0, 0)) |
| assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.LeCommandReject()) |
| |
| @metadata(pts_test_id="L2CAP/LE/CFC/BV-01-C", pts_test_name="LE Credit Based Connection Request - Legacy Peer") |
| def test_le_credit_based_connection_request_legacy_peer(self): |
| """ |
| Verify that an IUT sending an LE Credit Based Connection Request to a legacy peer and receiving a Command Reject does not establish the channel. |
| """ |
| self._setup_link_from_cert() |
| response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33) |
| self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33) |
| assertThat(response_future.get_status()).isNotEqualTo(LeCreditBasedConnectionResponseResult.SUCCESS) |
| |
| @metadata( |
| pts_test_id="L2CAP/LE/CFC/BV-02-C", pts_test_name="LE Credit Based Connection Request on Supported LE_PSM") |
| def test_le_credit_based_connection_request_on_supported_le_psm(self): |
| """ |
| Verify that an IUT sending an LE Credit Based Connection Request to a peer will establish the channel upon receiving the LE Credit Based Connection Response. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_channel_from_dut() |
| cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) |
| assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) |
| |
| @metadata( |
| pts_test_id="L2CAP/LE/CFC/BV-03-C", pts_test_name="LE Credit Based Connection Response on Supported LE_PSM") |
| def test_credit_based_connection_response_on_supported_le_psm(self): |
| """ |
| Verify that an IUT receiving a valid LE Credit Based Connection Request from a peer will send an LE Credit Based Connection Response and establish the channel. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_channel_from_cert() |
| dut_channel.send(b'hello') |
| assertThat(cert_channel).emits(L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) |
| |
| @metadata( |
| pts_test_id="L2CAP/LE/CFC/BV-04-C", pts_test_name="LE Credit Based Connection Request on an Unsupported LE_PSM") |
| def test_credit_based_connection_request_on_an_unsupported_le_psm(self): |
| """ |
| Verify that an IUT sending an LE Credit Based Connection Request on an unsupported LE_PSM will not establish a channel upon receiving an LE Credit Based Connection Response refusing the connection. |
| """ |
| self._setup_link_from_cert() |
| response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33) |
| self.cert_l2cap.verify_and_respond_open_channel_from_remote( |
| psm=0x33, result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) |
| assertThat(response_future.get_status()).isEqualTo(LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED) |
| |
| @metadata( |
| pts_test_id="L2CAP/LE/CFC/BV-05-C", pts_test_name="LE Credit Based Connection Request - unsupported LE_PSM") |
| def test_credit_based_connection_request_unsupported_le_psm(self): |
| """ |
| Verify that an IUT receiving an LE Credit Based Connection Request on an unsupported LE_PSM will respond with an LE Credit Based Connection Response refusing the connection. |
| """ |
| self._setup_link_from_cert() |
| self.cert_l2cap.get_control_channel().send( |
| l2cap_packets.LeCreditBasedConnectionRequestBuilder(1, 0x34, 0x0101, 2000, 1000, 1000)) |
| assertThat(self.cert_l2cap.get_control_channel()).emits( |
| L2capMatchers.CreditBasedConnectionResponse( |
| result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)) |
| |
| @metadata(pts_test_id="L2CAP/LE/CFC/BV-06-C", pts_test_name="Credit Exchange – Receiving Incremental Credits") |
| def test_credit_exchange_receiving_incremental_credits(self): |
| """ |
| Verify the IUT handles flow control correctly, by handling the LE Flow Control Credit sent by the peer. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_channel_from_cert(initial_credit=0) |
| for _ in range(4): |
| dut_channel.send(b'hello') |
| cert_channel.send_credits(1) |
| assertThat(cert_channel).emits(L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) |
| cert_channel.send_credits(1) |
| assertThat(cert_channel).emits(L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) |
| cert_channel.send_credits(2) |
| assertThat(cert_channel).emits( |
| L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5), L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) |
| |
| @metadata(pts_test_id="L2CAP/LE/CFC/BV-07-C", pts_test_name="Credit Exchange – Sending Credits") |
| def test_credit_exchange_sending_credits(self): |
| """ |
| Verify that the IUT sends LE Flow Control Credit to the peer. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_channel_from_cert() |
| credits = cert_channel.credits_left() |
| # Note: DUT only needs to send credit when ALL credits are consumed. |
| # Here we enforce that DUT sends credit after receiving 3 packets, to |
| # test without sending too many packets (may take too long). |
| # This behavior is not expected for all Bluetooth stacks. |
| for _ in range(min(credits + 1, 3)): |
| cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) |
| self.cert_l2cap.verify_le_flow_control_credit(cert_channel) |
| |
| @metadata(pts_test_id="L2CAP/LE/CFC/BV-08-C", pts_test_name="Disconnection Request") |
| def test_disconnection_request(self): |
| """ |
| Verify that the IUT can disconnect the channel. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_channel_from_cert() |
| dut_channel.close_channel() |
| cert_channel.verify_disconnect_request() |
| |
| @metadata(pts_test_id="L2CAP/LE/CFC/BV-09-C", pts_test_name="Disconnection Response") |
| def test_disconnection_response(self): |
| """ |
| Verify that the IUT responds correctly to reception of a Disconnection Request. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_channel_from_cert() |
| cert_channel.disconnect_and_verify() |
| |
| @metadata(pts_test_id="L2CAP/LE/CFC/BV-10-C", pts_test_name="Security - Insufficient Authentication – Initiator") |
| def test_security_insufficient_authentication_initiator(self): |
| """ |
| Verify that the IUT does not establish the channel upon receipt of an LE Credit Based Connection Response indicating the connection was refused with Result “0x0005 – Connection Refused – Insufficient Authentication". |
| """ |
| self._setup_link_from_cert() |
| response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33) |
| self.cert_l2cap.verify_and_respond_open_channel_from_remote( |
| psm=0x33, result=LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION) |
| assertThat(response_future.get_status()).isEqualTo( |
| LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION) |
| |
| @metadata(pts_test_id="L2CAP/LE/CFC/BV-11-C", pts_test_name="Security - Insufficient Authentication – Responder") |
| def test_security_insufficient_authentication_responder(self): |
| """ |
| Verify that an IUT refuses to create a connection upon reception of an LE Credit Based Connection |
| Request which fails to satisfy authentication requirements. |
| """ |
| self._setup_link_from_cert() |
| psm = 0x33 |
| self.dut_l2cap.register_coc(self.cert_address, psm, SecurityLevel.AUTHENTICATED_PAIRING_WITH_ENCRYPTION) |
| self.cert_l2cap.open_channel_with_expected_result( |
| psm, LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION) |
| |
| @metadata(pts_test_id="L2CAP/LE/CFC/BV-12-C", pts_test_name="Security - Insufficient Authorization – Initiator") |
| def test_security_insufficient_authorization_initiator(self): |
| """ |
| Verify that the IUT does not establish the channel upon receipt of an LE Credit Based Connection Response indicating the connection was refused with Result “0x0006 – Connection Refused – Insufficient Authorization”. |
| """ |
| self._setup_link_from_cert() |
| response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33) |
| self.cert_l2cap.verify_and_respond_open_channel_from_remote( |
| psm=0x33, result=LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION) |
| assertThat(response_future.get_status()).isEqualTo( |
| LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION) |
| |
| @metadata(pts_test_id="L2CAP/LE/CFC/BV-13-C", pts_test_name="Security - Insufficient Authorization – Responder") |
| def test_security_insufficient_authorization_responder(self): |
| """ |
| Verify that an IUT refuses to create a connection upon reception of an LE Credit Based Connection |
| Request which fails to satisfy authentication requirements. |
| """ |
| self._setup_link_from_cert() |
| psm = 0x33 |
| self.dut_l2cap.register_coc(self.cert_address, psm, SecurityLevel.AUTHORIZATION) |
| self.cert_l2cap.open_channel_with_expected_result( |
| psm, LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION) |
| |
| @metadata(pts_test_id="L2CAP/LE/CFC/BV-14-C", pts_test_name="Security - Insufficient Key Size – Initiator") |
| def test_security_insufficient_key_size_initiator(self): |
| """ |
| Verify that the IUT does not establish the channel upon receipt of an |
| LE Credit Based Connection Response indicating the connection was |
| refused with Result "0x0007 – Connection Refused – Insufficient |
| Encryption Key Size". |
| """ |
| self._setup_link_from_cert() |
| response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33) |
| self.cert_l2cap.verify_and_respond_open_channel_from_remote( |
| psm=0x33, result=LeCreditBasedConnectionResponseResult.INSUFFICIENT_ENCRYPTION_KEY_SIZE) |
| assertThat(response_future.get_status()).isEqualTo( |
| LeCreditBasedConnectionResponseResult.INSUFFICIENT_ENCRYPTION_KEY_SIZE) |
| |
| @metadata( |
| pts_test_id="L2CAP/LE/CFC/BV-15-C", pts_test_name="Security - Insufficient Encryption Key Size – Responder") |
| def test_security_insufficient_encryption_key_size_responder(self): |
| """ |
| Verify that an IUT refuses to create a connection upon receipt of an LE Credit Based Connection |
| Request which fails to satisfy Encryption Key Size requirements. |
| """ |
| self._setup_link_from_cert() |
| psm = 0x33 |
| self.dut_l2cap.register_coc(self.cert_address, psm, SecurityLevel.AUTHENTICATED_PAIRING_WITH_128_BIT_KEY) |
| self.cert_l2cap.open_channel_with_expected_result( |
| psm, LeCreditBasedConnectionResponseResult.INSUFFICIENT_ENCRYPTION_KEY_SIZE) |
| |
| @metadata( |
| pts_test_id="L2CAP/LE/CFC/BV-16-C", |
| pts_test_name="LE Credit Based Connection Request - refuse due to insufficient resources - Initiator") |
| def test_le_connection_request_insufficient_resources_initiator(self): |
| """ |
| Verify that an IUT sending an LE Credit Based Connection Request does |
| not establish the channel upon receiving an LE Credit Based Connection |
| Response refusing the connection with result "0x0004 – Connection |
| refused – no resources available". |
| """ |
| self._setup_link_from_cert() |
| response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33) |
| self.cert_l2cap.verify_and_respond_open_channel_from_remote( |
| psm=0x33, result=LeCreditBasedConnectionResponseResult.NO_RESOURCES_AVAILABLE) |
| assertThat(response_future.get_status()).isEqualTo(LeCreditBasedConnectionResponseResult.NO_RESOURCES_AVAILABLE) |
| |
| @metadata( |
| pts_test_id="L2CAP/LE/CFC/BV-18-C", |
| pts_test_name="LE Credit Based Connection Request - refused due to Invalid Source CID - Initiator") |
| def test_request_refused_due_to_invalid_source_cid_initiator(self): |
| """ |
| Verify that an IUT sending an LE Credit Based Connection Request does not establish the channel upon receiving an LE Credit Based Connection Response refusing the connection with result "0x0009 – Connection refused – Invalid Source CID". |
| """ |
| self._setup_link_from_cert() |
| response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33) |
| self.cert_l2cap.verify_and_respond_open_channel_from_remote( |
| psm=0x33, result=LeCreditBasedConnectionResponseResult.INVALID_SOURCE_CID) |
| assertThat(response_future.get_status()).isEqualTo(LeCreditBasedConnectionResponseResult.INVALID_SOURCE_CID) |
| |
| @metadata( |
| pts_test_id="L2CAP/LE/CFC/BV-19-C", |
| pts_test_name="LE Credit Based Connection Request - refused due to source CID already allocated - Initiator") |
| def test_request_refused_due_to_source_cid_already_allocated_initiator(self): |
| """ |
| Verify that an IUT sending an LE Credit Based Connection Request does not establish the channel upon receiving an LE Credit Based Connection Response refusing the connection with result "0x000A – Connection refused – Source CID already allocated". |
| """ |
| self._setup_link_from_cert() |
| response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33) |
| self.cert_l2cap.verify_and_respond_open_channel_from_remote( |
| psm=0x33, result=LeCreditBasedConnectionResponseResult.SOURCE_CID_ALREADY_ALLOCATED) |
| assertThat(response_future.get_status()).isEqualTo( |
| LeCreditBasedConnectionResponseResult.SOURCE_CID_ALREADY_ALLOCATED) |
| |
| @metadata( |
| pts_test_id="L2CAP/LE/CFC/BV-20-C", |
| pts_test_name="LE Credit Based Connection Response - refused due to Source CID already allocated - Responder") |
| def test_request_refused_due_to_source_cid_already_allocated_responder(self): |
| """ |
| Verify that an IUT receiving an LE Credit Based Connection Request for a second channel will refuse the connection with result "0x000A - Connection refused – Source CID already allocated" if it receives a Source CID which is already in use. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_channel_from_cert(psm=0x33, scid=0x0101) |
| self.dut_l2cap.register_coc(self.cert_address, psm=0x35) |
| self.cert_l2cap.get_control_channel().send( |
| l2cap_packets.LeCreditBasedConnectionRequestBuilder(2, 0x35, 0x0101, 1000, 1000, 1000)) |
| assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.CreditBasedConnectionResponseUsedCid()) |
| |
| @metadata( |
| pts_test_id="L2CAP/LE/CFC/BV-21-C", |
| pts_test_name="LE Credit Based Connection Request - refused due to Unacceptable Parameters - Initiator") |
| def test_request_refused_due_to_unacceptable_parameters_initiator(self): |
| """ |
| Verify that an IUT sending an LE Credit Based Connection Request does not establish the channel upon receiving an LE Credit Based Connection Response refusing the connection with result "0x000B – Connection refused – Unacceptable Parameters". |
| """ |
| self._setup_link_from_cert() |
| response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33) |
| self.cert_l2cap.verify_and_respond_open_channel_from_remote( |
| psm=0x33, result=LeCreditBasedConnectionResponseResult.UNACCEPTABLE_PARAMETERS) |
| assertThat(response_future.get_status()).isEqualTo( |
| LeCreditBasedConnectionResponseResult.UNACCEPTABLE_PARAMETERS) |
| |
| @metadata(pts_test_id="L2CAP/LE/CFC/BI-01-C", pts_test_name="Credit Exchange – Exceed Initial Credits") |
| def test_credit_exchange_exceed_initial_credits(self): |
| """ |
| Verify that the IUT disconnects the LE Data Channel when the credit count exceeds 65535. |
| """ |
| self._setup_link_from_cert() |
| (dut_channel, cert_channel) = self._open_channel_from_cert() |
| cert_channel.send_credits(65535) |
| cert_channel.verify_disconnect_request() |