| #!/usr/bin/env python3 |
| # |
| # Copyright (C) 2020 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """ |
| Setup: |
| This test only requires two fuchsia devices. |
| """ |
| |
| from acts import signals |
| from acts.base_test import BaseTestClass |
| from acts.test_decorators import test_tracker_info |
| from acts_contrib.test_utils.bt.bt_test_utils import generate_id_by_size |
| from acts_contrib.test_utils.fuchsia.bt_test_utils import bredr_scan_for_device_by_name |
| from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name |
| from acts_contrib.test_utils.fuchsia.bt_test_utils import unbond_all_known_devices |
| from acts_contrib.test_utils.fuchsia.bt_test_utils import verify_device_state_by_name |
| import time |
| |
| |
| class BtFuchsiaEPTest(BaseTestClass): |
| ble_advertise_interval = 50 |
| scan_timeout_seconds = 60 |
| default_iterations = 1000 |
| adv_name = generate_id_by_size(10) |
| test_adv_data = { |
| "name": adv_name, |
| "appearance": None, |
| "service_data": None, |
| "tx_power_level": None, |
| "service_uuids": None, |
| "manufacturer_data": None, |
| "uris": None, |
| } |
| test_connectable = True |
| test_scan_response = None |
| |
| def setup_class(self): |
| super().setup_class() |
| for fd in self.fuchsia_devices: |
| fd.bts_lib.initBluetoothSys() |
| self.pri_dut = self.fuchsia_devices[0] |
| self.sec_dut = self.fuchsia_devices[1] |
| |
| def on_fail(self, test_name, begin_time): |
| for fd in self.fuchsia_devices: |
| fd.take_bug_report(test_name, begin_time) |
| self._unbond_all_known_devices() |
| self.sec_dut.ble_lib.bleStopBleAdvertising() |
| self._kill_media_services() |
| |
| def teardown_class(self): |
| self._kill_media_services() |
| |
| def _kill_media_services(self): |
| """Kill any BT services related to A2DP/AVRCP on all Fuchsia devices. |
| """ |
| ssh_timeout = 30 |
| for fd in self.fuchsia_devices: |
| fd.send_command_ssh("killall bt-a2dp*", |
| timeout=ssh_timeout, |
| skip_status_code_check=True) |
| fd.send_command_ssh("killall bt-avrcp*", |
| timeout=ssh_timeout, |
| skip_status_code_check=True) |
| |
| def _unbond_all_known_devices(self): |
| """For all Fuchsia devices, unbond any known pairings. |
| """ |
| time.sleep(5) |
| for fd in self.fuchsia_devices: |
| unbond_all_known_devices(fd, self.log) |
| |
| def test_ble_awareness(self): |
| """Verify that Fuchsia devices can advertise and scan each other |
| |
| Verify a Fuchsia device that starts a BLE advertisesement can be |
| found by a Fuchsia BLE scanner. |
| |
| Steps: |
| 1. On one Fuchsia device set an advertisement |
| 2. On one Fuchsia device, scan for the advertisement by name |
| |
| Expected Result: |
| Verify that there are no errors after each GATT connection. |
| |
| Returns: |
| signals.TestPass if no errors |
| signals.TestFailure if there are any errors during the test. |
| |
| TAGS: BLE |
| Priority: 0 |
| """ |
| |
| self.sec_dut.ble_lib.bleStartBleAdvertising( |
| self.test_adv_data, self.test_scan_response, |
| self.ble_advertise_interval, self.test_connectable) |
| |
| device = le_scan_for_device_by_name(self.pri_dut, self.log, |
| self.adv_name, |
| self.scan_timeout_seconds) |
| self.sec_dut.ble_lib.bleStopBleAdvertising() |
| if device is None: |
| raise signals.TestFailure("Scanner unable to find advertisement.") |
| raise signals.TestPass("Success") |
| |
| def test_gatt_central_peripheral(self): |
| """Verify that Fuchsia devices can perform GATT operations |
| |
| Verify a Fuchsia devices can perform GATT connections and interactions. |
| |
| Steps: |
| 1. On one Fuchsia device set an advertisement |
| 2. On one Fuchsia device, scan for the advertisement by name |
| 3. Perform GATT connection over LE |
| 4. Pair both devices. |
| 5. Perform GATT read/write operations. |
| 6. Perform GATT disconnection. |
| |
| Expected Result: |
| Verify that there are no errors after each GATT connection. |
| |
| Returns: |
| signals.TestPass if no errors |
| signals.TestFailure if there are any errors during the test. |
| |
| TAGS: BLE |
| Priority: 0 |
| """ |
| self._unbond_all_known_devices() |
| |
| source_device_name = generate_id_by_size(10) |
| self.pri_dut.bts_lib.setName(source_device_name) |
| |
| self.sec_dut.ble_lib.bleStartBleAdvertising( |
| self.test_adv_data, self.test_scan_response, |
| self.ble_advertise_interval, self.test_connectable) |
| |
| device = le_scan_for_device_by_name(self.pri_dut, self.log, |
| self.adv_name, |
| self.scan_timeout_seconds) |
| if device is None: |
| raise signals.TestFailure("Scanner unable to find advertisement.") |
| |
| connect_result = self.pri_dut.gattc_lib.bleConnectToPeripheral( |
| device["id"]) |
| if connect_result.get("error") is not None: |
| raise signals.TestFailure("GATT Connection failed with: {}".format( |
| connect_result.get("error"))) |
| |
| if not verify_device_state_by_name(self.pri_dut, self.log, |
| self.adv_name, "CONNECTED", None): |
| raise signals.TestFailure( |
| "Failed to connect to device {}.".format(target_device_name)) |
| |
| if not verify_device_state_by_name( |
| self.sec_dut, self.log, source_device_name, "CONNECTED", None): |
| raise signals.TestFailure( |
| "Failed to connect to device {}.".format(source_device_name)) |
| |
| security_level = "ENCRYPTED" |
| non_bondable = False |
| transport = 2 #LE |
| self.pri_dut.bts_lib.pair(device["id"], security_level, non_bondable, |
| transport) |
| |
| services = None |
| if not verify_device_state_by_name(self.pri_dut, self.log, |
| self.adv_name, "BONDED", services): |
| raise signals.TestFailure( |
| "Failed to pair device {}.".format(target_device_name)) |
| |
| if not verify_device_state_by_name(self.sec_dut, self.log, |
| source_device_name, "BONDED", |
| services): |
| raise signals.TestFailure( |
| "Failed to pair device {}.".format(source_device_name)) |
| |
| disconnect_result = self.pri_dut.gattc_lib.bleDisconnectPeripheral( |
| device["id"]) |
| if disconnect_result.get("error") is not None: |
| raise signals.TestFailure( |
| "GATT Disconnection failed with: {}".format( |
| connect_result.get("error"))) |
| |
| self.sec_dut.ble_lib.bleStopBleAdvertising() |
| |
| # TODO: Setup Proper GATT server and verify services published are found |
| |
| raise signals.TestPass("Success") |
| |
| def test_pairing_a2dp(self): |
| """Verify that Fuchsia devices can pair to each other and establish |
| an A2DP connection |
| |
| Verify that Fuchsia devices can pair to each other and establish |
| an A2DP connection |
| |
| Steps: |
| 1. Clear out all bonded devices |
| 2. Stop any A2DP services running on the device |
| Needed to take ownership of the services |
| 3. Init sink and source opposite devices |
| 4. Start pairing delegate for all Fuchsia devices |
| 5. Set sink device to be discoverable |
| 6. Discover sink device from source device |
| 7. Connect to sink device from source device |
| 8. Pair to sink device |
| 9. Validate paired devices and services present |
| |
| Expected Result: |
| Verify devices are successfully paired and appropriate a2dp |
| services are running. |
| |
| Returns: |
| signals.TestPass if no errors |
| signals.TestFailure if there are any errors during the test. |
| |
| TAGS: BREDR, A2DP |
| Priority: 0 |
| """ |
| self._unbond_all_known_devices() |
| self._kill_media_services() |
| |
| source_device_name = generate_id_by_size(10) |
| target_device_name = generate_id_by_size(10) |
| |
| self.pri_dut.bts_lib.setName(source_device_name) |
| self.sec_dut.bts_lib.setName(target_device_name) |
| |
| input_capabilities = "NONE" |
| output_capabilities = "NONE" |
| |
| # Initialize a2dp on both devices. |
| self.pri_dut.avdtp_lib.init() |
| self.sec_dut.avdtp_lib.init() |
| |
| self.pri_dut.bts_lib.acceptPairing(input_capabilities, |
| output_capabilities) |
| |
| self.sec_dut.bts_lib.acceptPairing(input_capabilities, |
| output_capabilities) |
| self.sec_dut.bts_lib.setDiscoverable(True) |
| |
| unique_mac_addr_id = bredr_scan_for_device_by_name( |
| self.pri_dut, self.log, target_device_name, |
| self.scan_timeout_seconds) |
| |
| if not unique_mac_addr_id: |
| raise signals.TestFailure( |
| "Failed to find device {}.".format(target_device_name)) |
| |
| connect_result = self.pri_dut.bts_lib.connectDevice(unique_mac_addr_id) |
| if connect_result.get("error") is not None: |
| raise signals.TestFailure("Failed to connect with {}.".format( |
| connect_result.get("error"))) |
| |
| # We pair before checking the CONNECTED status because BR/EDR semantics |
| # were recently changed such that if pairing is not confirmed, then bt |
| # does not report connected = True. |
| security_level = "NONE" |
| bondable = True |
| transport = 1 #BREDR |
| pair_result = self.pri_dut.bts_lib.pair(unique_mac_addr_id, |
| security_level, bondable, |
| transport) |
| if pair_result.get("error") is not None: |
| raise signals.TestFailure("Failed to pair with {}.".format( |
| pair_result.get("error"))) |
| |
| if not verify_device_state_by_name( |
| self.pri_dut, self.log, target_device_name, "CONNECTED", None): |
| raise signals.TestFailure( |
| "Failed to connect to device {}.".format(target_device_name)) |
| |
| if not verify_device_state_by_name( |
| self.sec_dut, self.log, source_device_name, "CONNECTED", None): |
| raise signals.TestFailure( |
| "Failed to connect to device {}.".format(source_device_name)) |
| |
| #TODO: Validation of services and paired devices (b/175641870) |
| # A2DP sink: 0000110b-0000-1000-8000-00805f9b34fb |
| # A2DP source: 0000110a-0000-1000-8000-00805f9b34fb |
| #TODO: Make an easy function for checking/updating devices |
| services = None |
| if not verify_device_state_by_name(self.pri_dut, self.log, |
| target_device_name, "BONDED", |
| services): |
| raise signals.TestFailure( |
| "Failed to pair device {}.".format(target_device_name)) |
| |
| if not verify_device_state_by_name(self.sec_dut, self.log, |
| source_device_name, "BONDED", |
| services): |
| raise signals.TestFailure( |
| "Failed to pair device {}.".format(source_device_name)) |
| |
| raise signals.TestPass("Success") |