| import hci_packets as hci |
| import link_layer_packets as ll |
| import llcp_packets as llcp |
| import random |
| import unittest |
| from hci_packets import ErrorCode |
| from py.bluetooth import Address |
| from py.controller import ControllerTest |
| |
| |
| class Test(ControllerTest): |
| |
| SDU_Interval_C_TO_P = 10000 # 10ms |
| SDU_Interval_P_TO_C = 10000 # 10ms |
| ISO_Interval = 16 # 20ms |
| Worst_Case_SCA = hci.ClockAccuracy.PPM_500 |
| Packing = hci.Packing.SEQUENTIAL |
| Framing = hci.Enable.ENABLED |
| NSE = 4 |
| Max_SDU_C_TO_P = 16 |
| Max_SDU_P_TO_C = 16 |
| PHY_C_TO_P = 0x1 |
| PHY_P_TO_C = 0x1 |
| Max_Transport_Latency_C_TO_P = 60000 # 60ms |
| Max_Transport_Latency_P_TO_C = 60000 # 60ms |
| RTN_C_TO_P = 3 |
| RTN_P_TO_C = 3 |
| |
| # LL/CIS/CEN/BV-01-C [CIS Setup Procedure, Central Initiated] |
| async def test(self): |
| # Test parameters. |
| cig_id = 0x12 |
| cis_id = 0x42 |
| cis_connection_handle = 0xe00 |
| peer_address = Address('aa:bb:cc:dd:ee:ff') |
| controller = self.controller |
| |
| # Enable Connected Isochronous Stream Host Support. |
| await self.enable_connected_isochronous_stream_host_support() |
| |
| # Prelude: Establish an ACL connection as central with the IUT. |
| acl_connection_handle = await self.establish_le_connection_central(peer_address) |
| |
| # 1. The Upper Tester sends an HCI_LE_Set_CIG_Parameters command to the IUT with |
| # Max_SDU_C_To_P and Max_SDU_P_To_C both set to 16, other parameters set to default, but |
| # with PHY and latency specified in Table 4.142 and framing enabled. The Upper Tester receives a |
| # success response from the IUT with CIS_Count = 1. |
| controller.send_cmd( |
| hci.LeSetCigParameters(cig_id=cig_id, |
| sdu_interval_c_to_p=self.SDU_Interval_C_TO_P, |
| sdu_interval_p_to_c=self.SDU_Interval_P_TO_C, |
| worst_case_sca=self.Worst_Case_SCA, |
| max_transport_latency_c_to_p=self.Max_Transport_Latency_C_TO_P, |
| max_transport_latency_p_to_c=self.Max_Transport_Latency_P_TO_C, |
| packing=self.Packing, |
| framing=self.Framing, |
| cis_config=[ |
| hci.CisParametersConfig(cis_id=cis_id, |
| max_sdu_c_to_p=self.Max_SDU_C_TO_P, |
| max_sdu_p_to_c=self.Max_SDU_P_TO_C, |
| phy_c_to_p=self.PHY_C_TO_P, |
| phy_p_to_c=self.PHY_P_TO_C, |
| rtn_c_to_p=self.RTN_C_TO_P, |
| rtn_p_to_c=self.RTN_P_TO_C) |
| ])) |
| |
| await self.expect_evt( |
| hci.LeSetCigParametersComplete(status=ErrorCode.SUCCESS, |
| num_hci_command_packets=1, |
| cig_id=cig_id, |
| connection_handle=[cis_connection_handle])) |
| |
| # 2. The Upper Tester sends an HCI_LE_Create_CIS command to create a single CIS and receives a |
| # success response from the IUT. |
| controller.send_cmd( |
| hci.LeCreateCis(cis_config=[ |
| hci.LeCreateCisConfig(cis_connection_handle=cis_connection_handle, |
| acl_connection_handle=acl_connection_handle) |
| ])) |
| |
| await self.expect_evt(hci.LeCreateCisStatus(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) |
| |
| # 3. The Lower Tester receives an LL_CIS_REQ PDU from the IUT with all fields set to valid values. |
| # CIS_Offset_Min is a value between 500µs and TSPX_conn_interval, CIS_Offset_Max is a value |
| # between CIS_Offset_Min and the CIS_Offset_Max value as calculated in [14] Section 2.4.2.29 |
| # using TSPX_conn_interval as the value of connInterval, and connEventCount is the reference |
| # event anchor point for which the offsets applied. |
| cis_req = await self.expect_llcp(source_address=controller.address, |
| destination_address=peer_address, |
| expected_pdu=llcp.CisReq(cig_id=cig_id, |
| cis_id=cis_id, |
| phy_c_to_p=hci.PhyType.LE_1M, |
| phy_p_to_c=hci.PhyType.LE_1M, |
| framed=self.Framing == hci.Enable.ENABLED, |
| max_sdu_c_to_p=self.Max_SDU_C_TO_P, |
| max_sdu_p_to_c=self.Max_SDU_P_TO_C, |
| sdu_interval_c_to_p=self.Any, |
| sdu_interval_p_to_c=self.Any, |
| max_pdu_c_to_p=self.Any, |
| max_pdu_p_to_c=self.Any, |
| nse=self.Any, |
| sub_interval=self.Any, |
| bn_p_to_c=self.Any, |
| bn_c_to_p=self.Any, |
| ft_c_to_p=self.Any, |
| ft_p_to_c=self.Any, |
| iso_interval=self.Any, |
| cis_offset_min=self.Any, |
| cis_offset_max=self.Any, |
| conn_event_count=0)) |
| |
| # 4. The Lower Tester sends an LL_CIS_RSP PDU to the IUT. |
| controller.send_llcp(source_address=peer_address, |
| destination_address=controller.address, |
| pdu=llcp.CisRsp(cis_offset_min=cis_req.cis_offset_min, |
| cis_offset_max=cis_req.cis_offset_max, |
| conn_event_count=0)) |
| |
| # 5. The Lower Tester receives an LL_CIS_IND from the IUT where the CIS_Offset is the time (ms) |
| # from the start of the ACL connection event in connEvent Count to the first CIS anchor point, the |
| # CIS_Sync_Delay is CIG_Sync_Delay minus the offset from the CIG reference point to the CIS |
| # anchor point in s, and the connEventCount is the CIS_Offset reference point. |
| cis_ind = await self.expect_llcp(source_address=controller.address, |
| destination_address=peer_address, |
| expected_pdu=llcp.CisInd(aa=0, |
| cis_offset=self.Any, |
| cig_sync_delay=self.Any, |
| cis_sync_delay=self.Any, |
| conn_event_count=0)) |
| |
| # 6. The Upper Tester receives an HCI_LE_CIS_Established event indicating success, after the first |
| # CIS packet sent by the Lower Tester. The Connection_Handle parameter is set to the value |
| # provided in the HCI_LE_Create_CIS command. |
| await self.expect_evt( |
| hci.LeCisEstablished(status=ErrorCode.SUCCESS, |
| connection_handle=cis_connection_handle, |
| cig_sync_delay=cis_ind.cig_sync_delay, |
| cis_sync_delay=cis_ind.cis_sync_delay, |
| transport_latency_c_to_p=self.Any, |
| transport_latency_p_to_c=self.Any, |
| phy_c_to_p=hci.SecondaryPhyType.LE_1M, |
| phy_p_to_c=hci.SecondaryPhyType.LE_1M, |
| nse=cis_req.nse, |
| bn_c_to_p=cis_req.bn_c_to_p, |
| bn_p_to_c=cis_req.bn_p_to_c, |
| ft_c_to_p=cis_req.ft_c_to_p, |
| ft_p_to_c=cis_req.ft_p_to_c, |
| max_pdu_c_to_p=cis_req.max_pdu_c_to_p, |
| max_pdu_p_to_c=cis_req.max_pdu_p_to_c, |
| iso_interval=cis_req.iso_interval)) |
| |
| # 7. The Upper Tester sends an HCI_LE_Setup_ISO_Data_Path command and receives a success |
| # response from the IUT. |
| controller.send_cmd( |
| hci.LeSetupIsoDataPath( |
| connection_handle=cis_connection_handle, |
| data_path_direction=hci.DataPathDirection.OUTPUT, |
| data_path_id=0, |
| codec_id=0, |
| controller_delay=0, |
| codec_configuration=[], |
| )) |
| |
| await self.expect_evt( |
| hci.LeSetupIsoDataPathComplete(status=ErrorCode.SUCCESS, |
| num_hci_command_packets=1, |
| connection_handle=cis_connection_handle)) |
| |
| # 8. The Upper Tester sends HCI ISO data packets over the CIS and the Lower Tester receives |
| # framed ISO data. |
| iso_sdu = [random.randint(1, 251) for n in range(self.Max_SDU_C_TO_P)] |
| controller.send_iso( |
| hci.IsoWithoutTimestamp( |
| connection_handle=cis_connection_handle, |
| pb_flag=hci.IsoPacketBoundaryFlag.COMPLETE_SDU, |
| packet_sequence_number=42, |
| payload=iso_sdu, |
| )) |
| |
| await self.expect_ll( |
| ll.LeConnectedIsochronousPdu(source_address=controller.address, |
| destination_address=peer_address, |
| cig_id=cig_id, |
| cis_id=cis_id, |
| sequence_number=42, |
| data=iso_sdu)) |
| |
| # 9. The Upper Tester sends an HCI_Disconnect command to the IUT with Reason set to any valid |
| # value and Connection_Handle set to the connection handle of the active CIS and receives a |
| # successful HCI_Command_Status event in response. |
| controller.send_cmd( |
| hci.Disconnect(connection_handle=cis_connection_handle, reason=ErrorCode.REMOTE_USER_TERMINATED_CONNECTION)) |
| |
| await self.expect_evt(hci.DisconnectStatus(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) |
| |
| # 10. The IUT sends an LL_CIS_TERMINATE_IND PDU to the Lower Tester, and the ErrorCode field |
| # in the CrtData matches the Reason code value that the Upper Tester sent in step 9. |
| await self.expect_llcp( |
| source_address=controller.address, |
| destination_address=peer_address, |
| expected_pdu=llcp.CisTerminateInd(cig_id=cig_id, |
| cis_id=cis_id, |
| error_code=ErrorCode.REMOTE_USER_TERMINATED_CONNECTION)) |
| |
| # 11. The Lower Tester sends an LL Ack to the IUT. |
| # 12. The IUT sends an HCI_Disconnection_Complete event to the Upper Tester. |
| await self.expect_evt( |
| hci.DisconnectionComplete(status=ErrorCode.SUCCESS, |
| connection_handle=cis_connection_handle, |
| reason=ErrorCode.CONNECTION_TERMINATED_BY_LOCAL_HOST)) |
| |
| # 13. The Upper Tester sends an HCI_LE_Remove_CIG command to the IUT with CIG_ID set to the |
| # value of the current inactive CIG. |
| controller.send_cmd(hci.LeRemoveCig(cig_id=cig_id)) |
| |
| # 14. The IUT sends an HCI_Command_Complete event to the Upper Tester with Status set to 0x00 |
| # and CIG_ID set to the CIG_ID value in step 13. |
| await self.expect_evt( |
| hci.LeRemoveCigComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1, cig_id=cig_id)) |
| |
| # 15. The Upper Tester sends an HCI_LE_Set_CIG_Parameters command to the IUT with default |
| # parameters but with Max_SDU_C_To_P set to 0 and receives a success response from the IUT |
| # with CIS_Count = 1. |
| controller.send_cmd( |
| hci.LeSetCigParameters(cig_id=cig_id, |
| sdu_interval_c_to_p=self.SDU_Interval_C_TO_P, |
| sdu_interval_p_to_c=self.SDU_Interval_P_TO_C, |
| worst_case_sca=self.Worst_Case_SCA, |
| max_transport_latency_c_to_p=self.Max_Transport_Latency_C_TO_P, |
| max_transport_latency_p_to_c=self.Max_Transport_Latency_P_TO_C, |
| packing=self.Packing, |
| framing=self.Framing, |
| cis_config=[ |
| hci.CisParametersConfig(cis_id=cis_id, |
| max_sdu_c_to_p=0, |
| max_sdu_p_to_c=self.Max_SDU_P_TO_C, |
| phy_c_to_p=self.PHY_C_TO_P, |
| phy_p_to_c=self.PHY_P_TO_C, |
| rtn_c_to_p=self.RTN_C_TO_P, |
| rtn_p_to_c=self.RTN_P_TO_C) |
| ])) |
| |
| await self.expect_evt( |
| hci.LeSetCigParametersComplete(status=ErrorCode.SUCCESS, |
| num_hci_command_packets=1, |
| cig_id=cig_id, |
| connection_handle=[cis_connection_handle])) |
| |
| # 16. The Upper Tester sends an HCI_LE_Create_CIS command to create a single CIS and receives a |
| # success response from the IUT. |
| controller.send_cmd( |
| hci.LeCreateCis(cis_config=[ |
| hci.LeCreateCisConfig(cis_connection_handle=cis_connection_handle, |
| acl_connection_handle=acl_connection_handle) |
| ])) |
| |
| await self.expect_evt(hci.LeCreateCisStatus(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) |
| |
| # 17. The IUT sends an LL_CIS_REQ PDU to the Lower Tester with all fields set to valid values. |
| # 18. The value of Max_SDU_C_To_P and BN_C_To_P in the CrtData of the LL_CIS_REQ PDU are |
| # verified to be equal to 0. The test fails if the values are not equal to 0. |
| cis_req = await self.expect_llcp(source_address=controller.address, |
| destination_address=peer_address, |
| expected_pdu=llcp.CisReq(cig_id=cig_id, |
| cis_id=cis_id, |
| phy_c_to_p=hci.PhyType.LE_1M, |
| phy_p_to_c=hci.PhyType.LE_1M, |
| framed=self.Framing == hci.Enable.ENABLED, |
| max_sdu_c_to_p=0, |
| max_sdu_p_to_c=self.Max_SDU_P_TO_C, |
| sdu_interval_c_to_p=self.Any, |
| sdu_interval_p_to_c=self.Any, |
| max_pdu_c_to_p=self.Any, |
| max_pdu_p_to_c=self.Any, |
| nse=self.Any, |
| sub_interval=self.Any, |
| bn_c_to_p=0, |
| bn_p_to_c=self.Any, |
| ft_c_to_p=self.Any, |
| ft_p_to_c=self.Any, |
| iso_interval=self.Any, |
| cis_offset_min=self.Any, |
| cis_offset_max=self.Any, |
| conn_event_count=0)) |
| |
| # 19. The Lower Tester sends an LL_CIS_RSP PDU to the IUT. |
| controller.send_llcp(source_address=peer_address, |
| destination_address=controller.address, |
| pdu=llcp.CisRsp(cis_offset_min=cis_req.cis_offset_min, |
| cis_offset_max=cis_req.cis_offset_max, |
| conn_event_count=0)) |
| |
| # 20. The IUT sends an LL_CIS_IND to the Lower Tester. |
| cis_ind = await self.expect_llcp(source_address=controller.address, |
| destination_address=peer_address, |
| expected_pdu=llcp.CisInd(aa=0, |
| cis_offset=self.Any, |
| cig_sync_delay=self.Any, |
| cis_sync_delay=self.Any, |
| conn_event_count=0)) |
| |
| # 21. The IUT sends an empty ISO Data Packet to the Lower Tester. |
| # 22. The Lower Tester sends an LL Ack to the IUT. |
| # 23. The IUT sends an HCI_LE_CIS_Established event to the Upper Tester. The Connection_Handle |
| # parameter is set to the value provided in step 16. |
| await self.expect_evt( |
| hci.LeCisEstablished(status=ErrorCode.SUCCESS, |
| connection_handle=cis_connection_handle, |
| cig_sync_delay=cis_ind.cig_sync_delay, |
| cis_sync_delay=cis_ind.cis_sync_delay, |
| transport_latency_c_to_p=self.Any, |
| transport_latency_p_to_c=self.Any, |
| phy_c_to_p=hci.SecondaryPhyType.LE_1M, |
| phy_p_to_c=hci.SecondaryPhyType.LE_1M, |
| nse=cis_req.nse, |
| bn_c_to_p=cis_req.bn_c_to_p, |
| bn_p_to_c=cis_req.bn_p_to_c, |
| ft_c_to_p=cis_req.ft_c_to_p, |
| ft_p_to_c=cis_req.ft_p_to_c, |
| max_pdu_c_to_p=cis_req.max_pdu_c_to_p, |
| max_pdu_p_to_c=cis_req.max_pdu_p_to_c, |
| iso_interval=cis_req.iso_interval)) |
| |
| # 24. The Upper Tester sends an HCI_LE_Setup_ISO_Data_Path command to the IUT with |
| # Connection_Handle set to the value provided in step 16 and Data_Path_Direction set to Output. |
| controller.send_cmd( |
| hci.LeSetupIsoDataPath( |
| connection_handle=cis_connection_handle, |
| data_path_direction=hci.DataPathDirection.OUTPUT, |
| data_path_id=0, |
| codec_id=0, |
| controller_delay=0, |
| codec_configuration=[], |
| )) |
| |
| # 25. The IUT sends a successful HCI_Command_Complete event to the Upper Tester. |
| await self.expect_evt( |
| hci.LeSetupIsoDataPathComplete(status=ErrorCode.SUCCESS, |
| num_hci_command_packets=1, |
| connection_handle=cis_connection_handle)) |
| |
| # 26. The IUT sends a CIS Null PDU to the Lower Tester. |
| # 27. The Lower Tester sends an ISO Data Packet to the IUT. |
| # 28. Repeat steps 26 and 27, 50 times. |
| iso_sdu = [random.randint(1, 251) for n in range(self.Max_SDU_P_TO_C)] |
| controller.send_ll( |
| ll.LeConnectedIsochronousPdu(source_address=peer_address, |
| destination_address=controller.address, |
| cig_id=cig_id, |
| cis_id=cis_id, |
| sequence_number=42, |
| data=iso_sdu)) |
| |
| await self.expect_iso( |
| hci.IsoWithoutTimestamp( |
| connection_handle=cis_connection_handle, |
| pb_flag=hci.IsoPacketBoundaryFlag.COMPLETE_SDU, |
| iso_sdu_length=len(iso_sdu), |
| packet_sequence_number=42, |
| payload=iso_sdu, |
| )) |