| #!/usr/bin/env python3 |
| # |
| # Copyright (C) 2017 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """ |
| This test script exercises set PHY and read PHY procedures. |
| """ |
| |
| from queue import Empty |
| |
| from acts.test_decorators import test_tracker_info |
| from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest |
| from acts_contrib.test_utils.bt.GattConnectedBaseTest import GattConnectedBaseTest |
| from acts_contrib.test_utils.bt.bt_constants import gatt_connection_priority |
| from acts_contrib.test_utils.bt.bt_constants import gatt_event |
| from acts_contrib.test_utils.bt.bt_constants import gatt_phy |
| from acts import signals |
| |
| CONNECTION_PRIORITY_HIGH = gatt_connection_priority['high'] |
| PHY_LE_1M = gatt_phy['1m'] |
| PHY_LE_2M = gatt_phy['2m'] |
| |
| |
| def lfmt(txPhy, rxPhy): |
| return '(' + list(gatt_phy.keys())[list(gatt_phy.values()).index( |
| txPhy)] + ', ' + list(gatt_phy.keys())[list(gatt_phy.values()).index( |
| rxPhy)] + ')' |
| |
| |
| class PhyTest(GattConnectedBaseTest): |
| def setup_class(self): |
| super(PhyTest, self).setup_class() |
| if not self.cen_ad.droid.bluetoothIsLe2MPhySupported(): |
| raise signals.TestAbortClass( |
| "Central device does not support LE 2M PHY") |
| |
| if not self.per_ad.droid.bluetoothIsLe2MPhySupported(): |
| raise signals.TestAbortClass( |
| "Peripheral device does not support LE 2M PHY") |
| |
| # Some controllers auto-update PHY to 2M, and both client and server |
| # might receive PHY Update event right after connection, if the |
| # connection was established over 1M PHY. We will ignore this event, but |
| # must pop it from queue. |
| def pop_initial_phy_update(self): |
| try: |
| maybe_event = gatt_event['phy_update']['evt'].format( |
| self.gatt_callback) |
| self.cen_ad.ed.pop_event(maybe_event, 0) |
| except Empty: |
| pass |
| |
| try: |
| maybe_event = gatt_event['serv_phy_update']['evt'].format( |
| self.gatt_server_callback) |
| self.per_ad.ed.pop_event(maybe_event, 0) |
| except Empty: |
| pass |
| |
| # this helper method checks wether both client and server received PHY |
| # update event with proper txPhy and rxPhy |
| def ensure_both_updated_phy(self, clientTxPhy, clientRxPhy): |
| event = self._client_wait(gatt_event['phy_update']) |
| txPhy = event['data']['TxPhy'] |
| rxPhy = event['data']['RxPhy'] |
| self.log.info("\tClient PHY updated: " + lfmt(txPhy, rxPhy)) |
| self.assertEqual(0, event['data']['Status'], "Status should be 0") |
| self.assertEqual(clientTxPhy, event['data']['TxPhy']) |
| self.assertEqual(clientRxPhy, event['data']['RxPhy']) |
| |
| bt_device_id = 0 |
| event = self._server_wait(gatt_event['serv_phy_update']) |
| txPhy = event['data']['TxPhy'] |
| rxPhy = event['data']['RxPhy'] |
| self.log.info("\tServer PHY updated: " + lfmt(txPhy, rxPhy)) |
| self.assertEqual(0, event['data']['Status'], "Status should be 0") |
| self.assertEqual(clientRxPhy, event['data']['TxPhy']) |
| self.assertEqual(clientTxPhy, event['data']['RxPhy']) |
| |
| # read the client phy, return (txPhy, rxPhy) |
| def read_client_phy(self): |
| self.cen_ad.droid.gattClientReadPhy(self.bluetooth_gatt) |
| event = self._client_wait(gatt_event['phy_read']) |
| self.assertEqual(0, event['data']['Status'], "Status should be 0") |
| return (event['data']['TxPhy'], event['data']['RxPhy']) |
| |
| # read the server phy, return (txPhy, rxPhy) |
| def read_server_phy(self): |
| bt_device_id = 0 |
| self.per_ad.droid.gattServerReadPhy(self.gatt_server, bt_device_id) |
| event = self._server_wait(gatt_event['serv_phy_read']) |
| self.assertEqual(0, event['data']['Status'], "Status should be 0") |
| return (event['data']['TxPhy'], event['data']['RxPhy']) |
| |
| @BluetoothBaseTest.bt_test_wrap |
| @test_tracker_info(uuid='edb95ae1-97e5-4337-9a60-1e113aa43a4d') |
| def test_phy_read(self): |
| """Test LE read PHY. |
| |
| Test LE read PHY. |
| |
| Steps: |
| 1. Central, Peripheral : read PHY, make sure values are same. |
| 2. Central: update PHY. |
| 3. Ensure both Central and Peripheral received PHY update event. |
| 4. Central, Peripheral: read PHY, make sure values are same. |
| |
| Expected Result: |
| Verify that read PHY works properly. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, PHY |
| Priority: 0 |
| """ |
| self.cen_ad.droid.gattClientRequestConnectionPriority( |
| self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH) |
| self.pop_initial_phy_update() |
| |
| # read phy from client and server, make sure they're same |
| cTxPhy, cRxPhy = self.read_client_phy() |
| sTxPhy, sRxPhy = self.read_server_phy() |
| self.assertEqual(cTxPhy, sTxPhy) |
| self.assertEqual(cRxPhy, sRxPhy) |
| |
| self.log.info("Initial connection PHY was: " + lfmt(cTxPhy, cRxPhy)) |
| |
| nextTxPhy = (cTxPhy == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M |
| nextRxPhy = (cRxPhy == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M |
| |
| # try to update PHY from Client |
| self.log.info("Will try to set PHY to: " + lfmt(nextTxPhy, nextRxPhy)) |
| self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt, |
| nextTxPhy, nextRxPhy, 0) |
| self.ensure_both_updated_phy(nextTxPhy, nextRxPhy) |
| |
| # read phy on client and server, make sure values are same and equal |
| # the newly set value |
| cTxPhy, cRxPhy = self.read_client_phy() |
| sTxPhy, sRxPhy = self.read_server_phy() |
| self.assertEqual(cTxPhy, sTxPhy) |
| self.assertEqual(cRxPhy, sRxPhy) |
| |
| self.assertEqual(nextTxPhy, cTxPhy) |
| self.assertEqual(nextRxPhy, cRxPhy) |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| @test_tracker_info(uuid='6b66af0a-35eb-42af-acd5-9634684f275d') |
| def test_phy_change_20_times(self): |
| """Test PHY update. |
| |
| Test LE PHY update. |
| |
| Steps: |
| 1. Central: read PHY. |
| 2. Central: update PHY to 1M, 2M, 1M... 20 times, each time ensuring |
| both client and server received PHY update event. |
| |
| Expected Result: |
| Verify that read update PHY worked properly each time. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, PHY |
| Priority: 0 |
| """ |
| self.cen_ad.droid.gattClientRequestConnectionPriority( |
| self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH) |
| self.pop_initial_phy_update() |
| |
| txPhyB, rxPhyB = self.read_client_phy() |
| txPhyA = (txPhyB == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M |
| rxPhyA = (rxPhyB == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M |
| |
| self.log.info("Initial connection PHY was: " + lfmt(txPhyB, rxPhyB)) |
| |
| for i in range(20): |
| #swap values between iterations |
| txPhy = (i & 1) and txPhyB or txPhyA |
| rxPhy = (i & 1) and rxPhyB or rxPhyA |
| |
| self.log.info("Will try to set PHY to: " + lfmt(txPhy, rxPhy)) |
| self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt, |
| txPhy, rxPhy, 0) |
| self.ensure_both_updated_phy(txPhy, rxPhy) |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| @test_tracker_info(uuid='13f28de4-07f4-458c-a3e5-3ba95318616f') |
| def test_phy_change_asym(self): |
| """Test PHY update with asymetric rx and tx PHY. |
| |
| Test PHY update with asymetric rx and tx PHY. |
| |
| Steps: |
| 1. Central: read PHY. |
| 2. Central: update PHY to tx: 1M, rx: 2M, ensure both devices received |
| the asymetric update. |
| 3. Central: update PHY to tx: 2M, rx: 1M, ensure both devices received |
| the asymetric update. |
| |
| Expected Result: |
| Verify that read update PHY worked properly each time. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, PHY |
| Priority: 0 |
| """ |
| self.cen_ad.droid.gattClientRequestConnectionPriority( |
| self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH) |
| self.pop_initial_phy_update() |
| |
| txPhy, rxPhy = self.read_client_phy() |
| |
| self.log.info("Initial connection PHY was: " + lfmt(txPhy, rxPhy)) |
| self.log.info("will try to set PHY to: PHY_LE_1M, PHY_LE_2M") |
| |
| #try to update PHY to tx 1M, rx 2M from Client |
| self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt, |
| PHY_LE_1M, PHY_LE_2M, 0) |
| self.ensure_both_updated_phy(PHY_LE_1M, PHY_LE_2M) |
| |
| #try to update PHY to TX 2M, RX 1M from Client |
| self.log.info("will try to set PHY to: PHY_LE_2M, PHY_LE_1M") |
| self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt, |
| PHY_LE_2M, PHY_LE_1M, 0) |
| self.ensure_both_updated_phy(PHY_LE_2M, PHY_LE_1M) |
| |
| return True |