| #!/usr/bin/env python3 |
| # |
| # Copyright (C) 2018 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License |
| |
| import json |
| import logging |
| import math |
| import os |
| import re |
| import time |
| |
| from acts import asserts |
| from acts.controllers.ap_lib import hostapd_config |
| from acts.controllers.ap_lib import hostapd_constants |
| from acts.controllers.ap_lib import hostapd_security |
| from acts.controllers.utils_lib.ssh import connection |
| from acts.controllers.utils_lib.ssh import settings |
| from acts.controllers.iperf_server import IPerfResult |
| from acts.libs.proc import job |
| from acts_contrib.test_utils.bt.bt_constants import ( |
| bluetooth_profile_connection_state_changed) |
| from acts_contrib.test_utils.bt.bt_constants import bt_default_timeout |
| from acts_contrib.test_utils.bt.bt_constants import bt_profile_constants |
| from acts_contrib.test_utils.bt.bt_constants import bt_profile_states |
| from acts_contrib.test_utils.bt.bt_constants import bt_scan_mode_types |
| from acts_contrib.test_utils.bt.bt_gatt_utils import GattTestUtilsError |
| from acts_contrib.test_utils.bt.bt_gatt_utils import orchestrate_gatt_connection |
| from acts_contrib.test_utils.bt.bt_test_utils import disable_bluetooth |
| from acts_contrib.test_utils.bt.bt_test_utils import enable_bluetooth |
| from acts_contrib.test_utils.bt.bt_test_utils import is_a2dp_src_device_connected |
| from acts_contrib.test_utils.bt.bt_test_utils import is_a2dp_snk_device_connected |
| from acts_contrib.test_utils.bt.bt_test_utils import is_hfp_client_device_connected |
| from acts_contrib.test_utils.bt.bt_test_utils import is_map_mce_device_connected |
| from acts_contrib.test_utils.bt.bt_test_utils import is_map_mse_device_connected |
| from acts_contrib.test_utils.bt.bt_test_utils import set_bt_scan_mode |
| from acts_contrib.test_utils.car.car_telecom_utils import wait_for_active |
| from acts_contrib.test_utils.car.car_telecom_utils import wait_for_dialing |
| from acts_contrib.test_utils.car.car_telecom_utils import wait_for_not_in_call |
| from acts_contrib.test_utils.car.car_telecom_utils import wait_for_ringing |
| from acts_contrib.test_utils.tel.tel_test_utils import get_phone_number |
| from acts_contrib.test_utils.tel.tel_voice_utils import hangup_call |
| from acts_contrib.test_utils.tel.tel_voice_utils import initiate_call |
| from acts_contrib.test_utils.tel.tel_test_utils import setup_droid_properties |
| from acts_contrib.test_utils.tel.tel_voice_utils import wait_and_answer_call |
| from acts_contrib.test_utils.wifi.wifi_power_test_utils import get_phone_ip |
| from acts_contrib.test_utils.wifi.wifi_test_utils import reset_wifi |
| from acts_contrib.test_utils.wifi.wifi_test_utils import wifi_connect |
| from acts_contrib.test_utils.wifi.wifi_test_utils import wifi_test_device_init |
| from acts_contrib.test_utils.wifi.wifi_test_utils import wifi_toggle_state |
| from acts.utils import exe_cmd |
| from acts.libs.utils.multithread import run_multithread_func |
| from bokeh.layouts import column |
| from bokeh.models import tools as bokeh_tools |
| from bokeh.plotting import figure, output_file, save |
| |
| THROUGHPUT_THRESHOLD = 100 |
| AP_START_TIME = 10 |
| DISCOVERY_TIME = 10 |
| BLUETOOTH_WAIT_TIME = 2 |
| AVRCP_WAIT_TIME = 3 |
| |
| |
| def avrcp_actions(pri_ad, bt_device): |
| """Performs avrcp controls like volume up, volume down, skip next and |
| skip previous. |
| |
| Args: |
| pri_ad: Android device. |
| bt_device: bt device instance. |
| |
| Returns: |
| True if successful, otherwise False. |
| """ |
| current_volume = pri_ad.droid.getMediaVolume() |
| for _ in range(5): |
| bt_device.volume_up() |
| time.sleep(AVRCP_WAIT_TIME) |
| if current_volume == pri_ad.droid.getMediaVolume(): |
| pri_ad.log.error("Increase volume failed") |
| return False |
| time.sleep(AVRCP_WAIT_TIME) |
| current_volume = pri_ad.droid.getMediaVolume() |
| for _ in range(5): |
| bt_device.volume_down() |
| time.sleep(AVRCP_WAIT_TIME) |
| if current_volume == pri_ad.droid.getMediaVolume(): |
| pri_ad.log.error("Decrease volume failed") |
| return False |
| |
| #TODO: (sairamganesh) validate next and previous calls. |
| bt_device.next() |
| time.sleep(AVRCP_WAIT_TIME) |
| bt_device.previous() |
| time.sleep(AVRCP_WAIT_TIME) |
| return True |
| |
| |
| def connect_ble(pri_ad, sec_ad): |
| """Connect BLE device from DUT. |
| |
| Args: |
| pri_ad: An android device object. |
| sec_ad: An android device object. |
| |
| Returns: |
| True if successful, otherwise False. |
| """ |
| adv_instances = [] |
| gatt_server_list = [] |
| bluetooth_gatt_list = [] |
| pri_ad.droid.bluetoothEnableBLE() |
| sec_ad.droid.bluetoothEnableBLE() |
| gatt_server_cb = sec_ad.droid.gattServerCreateGattServerCallback() |
| gatt_server = sec_ad.droid.gattServerOpenGattServer(gatt_server_cb) |
| gatt_server_list.append(gatt_server) |
| try: |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(pri_ad, sec_ad)) |
| bluetooth_gatt_list.append(bluetooth_gatt) |
| except GattTestUtilsError as err: |
| pri_ad.log.error(err) |
| return False |
| adv_instances.append(adv_callback) |
| connected_devices = sec_ad.droid.gattServerGetConnectedDevices(gatt_server) |
| pri_ad.log.debug("Connected device = {}".format(connected_devices)) |
| return True |
| |
| |
| def collect_bluetooth_manager_dumpsys_logs(pri_ad, test_name): |
| """Collect "adb shell dumpsys bluetooth_manager" logs. |
| |
| Args: |
| pri_ad: An android device. |
| test_name: Current test case name. |
| |
| Returns: |
| Dumpsys file path. |
| """ |
| dump_counter = 0 |
| dumpsys_path = os.path.join(pri_ad.log_path, test_name, "BluetoothDumpsys") |
| os.makedirs(dumpsys_path, exist_ok=True) |
| while os.path.exists( |
| os.path.join(dumpsys_path, |
| "bluetooth_dumpsys_%s.txt" % dump_counter)): |
| dump_counter += 1 |
| out_file = "bluetooth_dumpsys_%s.txt" % dump_counter |
| cmd = "adb -s {} shell dumpsys bluetooth_manager > {}/{}".format( |
| pri_ad.serial, dumpsys_path, out_file) |
| exe_cmd(cmd) |
| file_path = os.path.join(dumpsys_path, out_file) |
| return file_path |
| |
| |
| def configure_and_start_ap(ap, network): |
| """Configure hostapd parameters and starts access point. |
| |
| Args: |
| ap: An access point object. |
| network: A dictionary with wifi network details. |
| """ |
| hostapd_sec = None |
| if network["security"] == "wpa2": |
| hostapd_sec = hostapd_security.Security( |
| security_mode=network["security"], password=network["password"]) |
| |
| config = hostapd_config.HostapdConfig( |
| n_capabilities=[hostapd_constants.N_CAPABILITY_HT40_MINUS], |
| mode=hostapd_constants.MODE_11N_PURE, |
| channel=network["channel"], |
| ssid=network["SSID"], |
| security=hostapd_sec) |
| ap.start_ap(config) |
| time.sleep(AP_START_TIME) |
| |
| |
| def connect_dev_to_headset(pri_droid, dev_to_connect, profiles_set): |
| """Connects primary android device to headset. |
| |
| Args: |
| pri_droid: Android device initiating connection. |
| dev_to_connect: Third party headset mac address. |
| profiles_set: Profiles to be connected. |
| |
| Returns: |
| True if Pass |
| False if Fail |
| """ |
| supported_profiles = bt_profile_constants.values() |
| for profile in profiles_set: |
| if profile not in supported_profiles: |
| pri_droid.log.info("Profile {} is not supported list {}".format( |
| profile, supported_profiles)) |
| return False |
| |
| paired = False |
| for paired_device in pri_droid.droid.bluetoothGetBondedDevices(): |
| if paired_device['address'] == dev_to_connect: |
| paired = True |
| break |
| |
| if not paired: |
| pri_droid.log.info("{} not paired to {}".format(pri_droid.serial, |
| dev_to_connect)) |
| return False |
| |
| end_time = time.time() + 10 |
| profile_connected = set() |
| sec_addr = dev_to_connect |
| pri_droid.log.info("Profiles to be connected {}".format(profiles_set)) |
| |
| while (time.time() < end_time and |
| not profile_connected.issuperset(profiles_set)): |
| if (bt_profile_constants['headset_client'] not in profile_connected and |
| bt_profile_constants['headset_client'] in profiles_set): |
| if is_hfp_client_device_connected(pri_droid, sec_addr): |
| profile_connected.add(bt_profile_constants['headset_client']) |
| if (bt_profile_constants['headset'] not in profile_connected and |
| bt_profile_constants['headset'] in profiles_set): |
| profile_connected.add(bt_profile_constants['headset']) |
| if (bt_profile_constants['a2dp'] not in profile_connected and |
| bt_profile_constants['a2dp'] in profiles_set): |
| if is_a2dp_src_device_connected(pri_droid, sec_addr): |
| profile_connected.add(bt_profile_constants['a2dp']) |
| if (bt_profile_constants['a2dp_sink'] not in profile_connected and |
| bt_profile_constants['a2dp_sink'] in profiles_set): |
| if is_a2dp_snk_device_connected(pri_droid, sec_addr): |
| profile_connected.add(bt_profile_constants['a2dp_sink']) |
| if (bt_profile_constants['map_mce'] not in profile_connected and |
| bt_profile_constants['map_mce'] in profiles_set): |
| if is_map_mce_device_connected(pri_droid, sec_addr): |
| profile_connected.add(bt_profile_constants['map_mce']) |
| if (bt_profile_constants['map'] not in profile_connected and |
| bt_profile_constants['map'] in profiles_set): |
| if is_map_mse_device_connected(pri_droid, sec_addr): |
| profile_connected.add(bt_profile_constants['map']) |
| time.sleep(0.1) |
| |
| while not profile_connected.issuperset(profiles_set): |
| try: |
| time.sleep(10) |
| profile_event = pri_droid.ed.pop_event( |
| bluetooth_profile_connection_state_changed, |
| bt_default_timeout + 10) |
| pri_droid.log.info("Got event {}".format(profile_event)) |
| except Exception: |
| pri_droid.log.error("Did not get {} profiles left {}".format( |
| bluetooth_profile_connection_state_changed, profile_connected)) |
| return False |
| profile = profile_event['data']['profile'] |
| state = profile_event['data']['state'] |
| device_addr = profile_event['data']['addr'] |
| if state == bt_profile_states['connected'] and ( |
| device_addr == dev_to_connect): |
| profile_connected.add(profile) |
| pri_droid.log.info( |
| "Profiles connected until now {}".format(profile_connected)) |
| return True |
| |
| |
| def device_discoverable(pri_ad, sec_ad): |
| """Verifies whether the device is discoverable or not. |
| |
| Args: |
| pri_ad: An primary android device object. |
| sec_ad: An secondary android device object. |
| |
| Returns: |
| True if the device found, False otherwise. |
| """ |
| pri_ad.droid.bluetoothMakeDiscoverable() |
| scan_mode = pri_ad.droid.bluetoothGetScanMode() |
| if scan_mode == bt_scan_mode_types['connectable_discoverable']: |
| pri_ad.log.info("Primary device scan mode is " |
| "SCAN_MODE_CONNECTABLE_DISCOVERABLE.") |
| else: |
| pri_ad.log.info("Primary device scan mode is not " |
| "SCAN_MODE_CONNECTABLE_DISCOVERABLE.") |
| return False |
| if sec_ad.droid.bluetoothStartDiscovery(): |
| time.sleep(DISCOVERY_TIME) |
| droid_name = pri_ad.droid.bluetoothGetLocalName() |
| droid_address = pri_ad.droid.bluetoothGetLocalAddress() |
| get_discovered_devices = sec_ad.droid.bluetoothGetDiscoveredDevices() |
| find_flag = False |
| |
| if get_discovered_devices: |
| for device in get_discovered_devices: |
| if 'name' in device and device['name'] == droid_name or ( |
| 'address' in device and |
| device["address"] == droid_address): |
| pri_ad.log.info("Primary device is in the discovery " |
| "list of secondary device.") |
| find_flag = True |
| break |
| else: |
| pri_ad.log.info("Secondary device get all the discovered devices " |
| "list is empty") |
| return False |
| else: |
| pri_ad.log.info("Secondary device start discovery process error.") |
| return False |
| if not find_flag: |
| return False |
| return True |
| |
| |
| def device_discoverability(required_devices): |
| """Wrapper function to keep required_devices in discoverable mode. |
| |
| Args: |
| required_devices: List of devices to be discovered. |
| |
| Returns: |
| discovered_devices: List of BD_ADDR of devices in discoverable mode. |
| """ |
| discovered_devices = [] |
| if "AndroidDevice" in required_devices: |
| discovered_devices.extend( |
| android_device_discoverability(required_devices["AndroidDevice"])) |
| if "RelayDevice" in required_devices: |
| discovered_devices.extend( |
| relay_device_discoverability(required_devices["RelayDevice"])) |
| return discovered_devices |
| |
| |
| def android_device_discoverability(droid_dev): |
| """To keep android devices in discoverable mode. |
| |
| Args: |
| droid_dev: Android device object. |
| |
| Returns: |
| device_list: List of device discovered. |
| """ |
| device_list = [] |
| for device in range(len(droid_dev)): |
| inquiry_device = droid_dev[device] |
| if enable_bluetooth(inquiry_device.droid, inquiry_device.ed): |
| if set_bt_scan_mode(inquiry_device, |
| bt_scan_mode_types['connectable_discoverable']): |
| device_list.append( |
| inquiry_device.droid.bluetoothGetLocalAddress()) |
| else: |
| droid_dev.log.error( |
| "Device {} scan mode is not in" |
| "SCAN_MODE_CONNECTABLE_DISCOVERABLE.".format( |
| inquiry_device.droid.bluetoothGetLocalAddress())) |
| return device_list |
| |
| |
| def relay_device_discoverability(relay_devices): |
| """To keep relay controlled devices in discoverable mode. |
| |
| Args: |
| relay_devices: Relay object. |
| |
| Returns: |
| mac_address: Mac address of relay controlled device. |
| """ |
| relay_device = relay_devices[0] |
| relay_device.power_on() |
| relay_device.enter_pairing_mode() |
| return relay_device.mac_address |
| |
| |
| def disconnect_headset_from_dev(pri_ad, sec_ad, profiles_list): |
| """Disconnect primary from secondary on a specific set of profiles |
| |
| Args: |
| pri_ad: Primary android_device initiating disconnection |
| sec_ad: Secondary android droid (sl4a interface to keep the |
| method signature the same connect_pri_to_sec above) |
| profiles_list: List of profiles we want to disconnect from |
| |
| Returns: |
| True on Success |
| False on Failure |
| """ |
| supported_profiles = bt_profile_constants.values() |
| for profile in profiles_list: |
| if profile not in supported_profiles: |
| pri_ad.log.info("Profile {} is not in supported list {}".format( |
| profile, supported_profiles)) |
| return False |
| |
| pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices()) |
| |
| try: |
| pri_ad.droid.bluetoothDisconnectConnectedProfile(sec_ad, profiles_list) |
| except Exception as err: |
| pri_ad.log.error( |
| "Exception while trying to disconnect profile(s) {}: {}".format( |
| profiles_list, err)) |
| return False |
| |
| profile_disconnected = set() |
| pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list)) |
| |
| while not profile_disconnected.issuperset(profiles_list): |
| try: |
| profile_event = pri_ad.ed.pop_event( |
| bluetooth_profile_connection_state_changed, bt_default_timeout) |
| pri_ad.log.info("Got event {}".format(profile_event)) |
| except Exception: |
| pri_ad.log.warning("Did not disconnect from Profiles") |
| return True |
| |
| profile = profile_event['data']['profile'] |
| state = profile_event['data']['state'] |
| device_addr = profile_event['data']['addr'] |
| |
| if state == bt_profile_states['disconnected'] and ( |
| device_addr == sec_ad): |
| profile_disconnected.add(profile) |
| pri_ad.log.info( |
| "Profiles disconnected so far {}".format(profile_disconnected)) |
| |
| return True |
| |
| |
| def initiate_disconnect_from_hf(audio_receiver, pri_ad, sec_ad, duration): |
| """Initiates call and disconnect call on primary device. |
| |
| Steps: |
| 1. Initiate call from HF. |
| 2. Wait for dialing state at DUT and wait for ringing at secondary device. |
| 3. Accepts call from secondary device. |
| 4. Wait for call active state at primary and secondary device. |
| 5. Sleeps until given duration. |
| 6. Disconnect call from primary device. |
| 7. Wait for call is not present state. |
| |
| Args: |
| audio_receiver: An relay device object. |
| pri_ad: An android device to disconnect call. |
| sec_ad: An android device accepting call. |
| duration: Duration of call in seconds. |
| |
| Returns: |
| True if successful, False otherwise. |
| """ |
| audio_receiver.press_initiate_call() |
| time.sleep(2) |
| flag = True |
| flag &= wait_for_dialing(logging, pri_ad) |
| flag &= wait_for_ringing(logging, sec_ad) |
| if not flag: |
| pri_ad.log.error("Outgoing call did not get established") |
| return False |
| |
| if not wait_and_answer_call(logging, sec_ad): |
| pri_ad.log.error("Failed to answer call in second device.") |
| return False |
| if not wait_for_active(logging, pri_ad): |
| pri_ad.log.error("AG not in Active state.") |
| return False |
| if not wait_for_active(logging, sec_ad): |
| pri_ad.log.error("RE not in Active state.") |
| return False |
| time.sleep(duration) |
| if not hangup_call(logging, pri_ad): |
| pri_ad.log.error("Failed to hangup call.") |
| return False |
| flag = True |
| flag &= wait_for_not_in_call(logging, pri_ad) |
| flag &= wait_for_not_in_call(logging, sec_ad) |
| return flag |
| |
| |
| def initiate_disconnect_call_dut(pri_ad, sec_ad, duration, callee_number): |
| """Initiates call and disconnect call on primary device. |
| |
| Steps: |
| 1. Initiate call from DUT. |
| 2. Wait for dialing state at DUT and wait for ringing at secondary device. |
| 3. Accepts call from secondary device. |
| 4. Wait for call active state at primary and secondary device. |
| 5. Sleeps until given duration. |
| 6. Disconnect call from primary device. |
| 7. Wait for call is not present state. |
| |
| Args: |
| pri_ad: An android device to disconnect call. |
| sec_ad: An android device accepting call. |
| duration: Duration of call in seconds. |
| callee_number: Secondary device's phone number. |
| |
| Returns: |
| True if successful, False otherwise. |
| """ |
| if not initiate_call(logging, pri_ad, callee_number): |
| pri_ad.log.error("Failed to initiate call") |
| return False |
| time.sleep(2) |
| |
| flag = True |
| flag &= wait_for_dialing(logging, pri_ad) |
| flag &= wait_for_ringing(logging, sec_ad) |
| if not flag: |
| pri_ad.log.error("Outgoing call did not get established") |
| return False |
| |
| if not wait_and_answer_call(logging, sec_ad): |
| pri_ad.log.error("Failed to answer call in second device.") |
| return False |
| # Wait for AG, RE to go into an Active state. |
| if not wait_for_active(logging, pri_ad): |
| pri_ad.log.error("AG not in Active state.") |
| return False |
| if not wait_for_active(logging, sec_ad): |
| pri_ad.log.error("RE not in Active state.") |
| return False |
| time.sleep(duration) |
| if not hangup_call(logging, pri_ad): |
| pri_ad.log.error("Failed to hangup call.") |
| return False |
| flag = True |
| flag &= wait_for_not_in_call(logging, pri_ad) |
| flag &= wait_for_not_in_call(logging, sec_ad) |
| |
| return flag |
| |
| |
| def check_wifi_status(pri_ad, network, ssh_config=None): |
| """Function to check existence of wifi connection. |
| |
| Args: |
| pri_ad: An android device. |
| network: network ssid. |
| ssh_config: ssh config for iperf client. |
| """ |
| time.sleep(5) |
| proc = job.run("pgrep -f 'iperf3 -c'") |
| pid_list = proc.stdout.split() |
| |
| while True: |
| iperf_proc = job.run(["pgrep", "-f", "iperf3"]) |
| process_list = iperf_proc.stdout.split() |
| if not wifi_connection_check(pri_ad, network["SSID"]): |
| pri_ad.adb.shell("killall iperf3") |
| if ssh_config: |
| time.sleep(5) |
| ssh_settings = settings.from_config(ssh_config) |
| ssh_session = connection.SshConnection(ssh_settings) |
| result = ssh_session.run("pgrep iperf3") |
| res = result.stdout.split("\n") |
| for pid in res: |
| try: |
| ssh_session.run("kill -9 %s" % pid) |
| except Exception as e: |
| logging.warning("No such process: %s" % e) |
| for pid in pid_list[:-1]: |
| job.run(["kill", " -9", " %s" % pid], ignore_status=True) |
| else: |
| job.run(["killall", " iperf3"], ignore_status=True) |
| break |
| elif pid_list[0] not in process_list: |
| break |
| |
| |
| def iperf_result(log, protocol, result): |
| """Accepts the iperf result in json format and parse the output to |
| get throughput value. |
| |
| Args: |
| log: Logger object. |
| protocol : TCP or UDP protocol. |
| result: iperf result's filepath. |
| |
| Returns: |
| rx_rate: Data received from client. |
| """ |
| if os.path.exists(result): |
| ip_cl = IPerfResult(result) |
| |
| if protocol == "udp": |
| rx_rate = (math.fsum(ip_cl.instantaneous_rates) / |
| len(ip_cl.instantaneous_rates))*8 |
| else: |
| rx_rate = ip_cl.avg_receive_rate * 8 |
| return rx_rate |
| else: |
| log.error("IPerf file not found") |
| return False |
| |
| |
| def is_a2dp_connected(pri_ad, headset_mac_address): |
| """Convenience Function to see if the 2 devices are connected on A2DP. |
| |
| Args: |
| pri_ad : An android device. |
| headset_mac_address : Mac address of headset. |
| |
| Returns: |
| True:If A2DP connection exists, False otherwise. |
| """ |
| devices = pri_ad.droid.bluetoothA2dpGetConnectedDevices() |
| for device in devices: |
| pri_ad.log.debug("A2dp Connected device {}".format(device["name"])) |
| if device["address"] == headset_mac_address: |
| return True |
| return False |
| |
| |
| def media_stream_check(pri_ad, duration, headset_mac_address): |
| """Checks whether A2DP connecion is active or not for given duration of |
| time. |
| |
| Args: |
| pri_ad : An android device. |
| duration : No of seconds to check if a2dp streaming is alive. |
| headset_mac_address : Headset mac address. |
| |
| Returns: |
| True: If A2dp connection is active for entire duration. |
| False: If A2dp connection is not active. |
| """ |
| while time.time() < duration: |
| if not is_a2dp_connected(pri_ad, headset_mac_address): |
| pri_ad.log.error('A2dp connection not active at %s', pri_ad.serial) |
| return False |
| time.sleep(1) |
| return True |
| |
| |
| def multithread_func(log, tasks): |
| """Multi-thread function wrapper. |
| |
| Args: |
| log: log object. |
| tasks: tasks to be executed in parallel. |
| |
| Returns: |
| List of results of tasks |
| """ |
| results = run_multithread_func(log, tasks) |
| for res in results: |
| if not res: |
| return False |
| return True |
| |
| |
| def music_play_and_check(pri_ad, headset_mac_address, music_to_play, duration): |
| """Starts playing media and checks if media plays for n seconds. |
| |
| Steps: |
| 1. Starts media player on android device. |
| 2. Checks if music streaming is ongoing for n seconds. |
| 3. Stops media player. |
| 4. Collect dumpsys logs. |
| |
| Args: |
| pri_ad: An android device. |
| headset_mac_address: Mac address of third party headset. |
| music_to_play: Indicates the music file to play. |
| duration: Time in secs to indicate music time to play. |
| |
| Returns: |
| True if successful, False otherwise. |
| """ |
| pri_ad.droid.setMediaVolume(pri_ad.droid.getMaxMediaVolume() - 1) |
| pri_ad.log.info("current volume = {}".format(pri_ad.droid.getMediaVolume())) |
| pri_ad.log.debug("In music play and check") |
| if not start_media_play(pri_ad, music_to_play): |
| pri_ad.log.error("Start media play failed.") |
| return False |
| stream_time = time.time() + duration |
| if not media_stream_check(pri_ad, stream_time, headset_mac_address): |
| pri_ad.log.error("A2DP Connection check failed.") |
| pri_ad.droid.mediaPlayStop() |
| return False |
| pri_ad.droid.mediaPlayStop() |
| return True |
| |
| |
| def music_play_and_check_via_app(pri_ad, headset_mac_address, duration=5): |
| """Starts google music player and check for A2DP connection. |
| |
| Steps: |
| 1. Starts Google music player on android device. |
| 2. Checks for A2DP connection. |
| |
| Args: |
| pri_ad: An android device. |
| headset_mac_address: Mac address of third party headset. |
| duration: Total time of music streaming. |
| |
| Returns: |
| True if successful, False otherwise. |
| """ |
| pri_ad.adb.shell("am start com.google.android.music") |
| time.sleep(3) |
| pri_ad.adb.shell("input keyevent 85") |
| stream_time = time.time() + duration |
| try: |
| if not media_stream_check(pri_ad, stream_time, headset_mac_address): |
| pri_ad.log.error("A2dp connection not active at %s", pri_ad.serial) |
| return False |
| finally: |
| pri_ad.adb.shell("am force-stop com.google.android.music") |
| return True |
| |
| |
| def pair_dev_to_headset(pri_ad, dev_to_pair): |
| """Pairs primary android device with headset. |
| |
| Args: |
| pri_ad: Android device initiating connection |
| dev_to_pair: Third party headset mac address. |
| |
| Returns: |
| True if Pass |
| False if Fail |
| """ |
| bonded_devices = pri_ad.droid.bluetoothGetBondedDevices() |
| for d in bonded_devices: |
| if d['address'] == dev_to_pair: |
| pri_ad.log.info("Successfully bonded to device {}".format( |
| dev_to_pair)) |
| return True |
| pri_ad.droid.bluetoothStartDiscovery() |
| time.sleep(10) # Wait until device gets discovered |
| pri_ad.droid.bluetoothCancelDiscovery() |
| pri_ad.log.debug("Discovered bluetooth devices: {}".format( |
| pri_ad.droid.bluetoothGetDiscoveredDevices())) |
| for device in pri_ad.droid.bluetoothGetDiscoveredDevices(): |
| if device['address'] == dev_to_pair: |
| |
| result = pri_ad.droid.bluetoothDiscoverAndBond(dev_to_pair) |
| pri_ad.log.info(result) |
| end_time = time.time() + bt_default_timeout |
| pri_ad.log.info("Verifying if device bonded with {}".format( |
| dev_to_pair)) |
| time.sleep(5) # Wait time until device gets paired. |
| while time.time() < end_time: |
| bonded_devices = pri_ad.droid.bluetoothGetBondedDevices() |
| for d in bonded_devices: |
| if d['address'] == dev_to_pair: |
| pri_ad.log.info( |
| "Successfully bonded to device {}".format( |
| dev_to_pair)) |
| return True |
| pri_ad.log.error("Failed to bond with {}".format(dev_to_pair)) |
| return False |
| |
| |
| def pair_and_connect_headset(pri_ad, headset_mac_address, profile_to_connect, retry=5): |
| """Pair and connect android device with third party headset. |
| |
| Args: |
| pri_ad: An android device. |
| headset_mac_address: Mac address of third party headset. |
| profile_to_connect: Profile to be connected with headset. |
| retry: Number of times pair and connection should happen. |
| |
| Returns: |
| True if pair and connect to headset successful, or raises exception |
| on failure. |
| """ |
| |
| paired = False |
| for i in range(1, retry): |
| if pair_dev_to_headset(pri_ad, headset_mac_address): |
| paired = True |
| break |
| else: |
| pri_ad.log.error("Attempt {} out of {}, Failed to pair, " |
| "Retrying.".format(i, retry)) |
| |
| if paired: |
| for i in range(1, retry): |
| if connect_dev_to_headset(pri_ad, headset_mac_address, |
| profile_to_connect): |
| return True |
| else: |
| pri_ad.log.error("Attempt {} out of {}, Failed to connect, " |
| "Retrying.".format(i, retry)) |
| else: |
| asserts.fail("Failed to pair and connect with {}".format( |
| headset_mac_address)) |
| |
| |
| def perform_classic_discovery(pri_ad, duration, file_name, dev_list=None): |
| """Convenience function to start and stop device discovery. |
| |
| Args: |
| pri_ad: An android device. |
| duration: iperf duration of the test. |
| file_name: Json file to which result is dumped |
| dev_list: List of devices to be discoverable mode. |
| |
| Returns: |
| True start and stop discovery is successful, False otherwise. |
| """ |
| if dev_list: |
| devices_required = device_discoverability(dev_list) |
| else: |
| devices_required = None |
| iteration = 0 |
| result = {} |
| result["discovered_devices"] = {} |
| discover_result = [] |
| start_time = time.time() |
| while time.time() < start_time + duration: |
| if not pri_ad.droid.bluetoothStartDiscovery(): |
| pri_ad.log.error("Failed to start discovery") |
| return False |
| time.sleep(DISCOVERY_TIME) |
| if not pri_ad.droid.bluetoothCancelDiscovery(): |
| pri_ad.log.error("Failed to cancel discovery") |
| return False |
| pri_ad.log.info("Discovered device list {}".format( |
| pri_ad.droid.bluetoothGetDiscoveredDevices())) |
| if devices_required is not None: |
| result["discovered_devices"][iteration] = [] |
| devices_name = { |
| element.get('name', element['address']) |
| for element in pri_ad.droid.bluetoothGetDiscoveredDevices() |
| if element["address"] in devices_required |
| } |
| result["discovered_devices"][iteration] = list(devices_name) |
| discover_result.extend([len(devices_name) == len(devices_required)]) |
| iteration += 1 |
| with open(file_name, 'a') as results_file: |
| json.dump(result, results_file, indent=4) |
| if False in discover_result: |
| return False |
| else: |
| pri_ad.log.warning("No devices are kept in discoverable mode") |
| return True |
| |
| |
| def connect_wlan_profile(pri_ad, network): |
| """Disconnect and Connect to AP. |
| |
| Args: |
| pri_ad: An android device. |
| network: Network to which AP to be connected. |
| |
| Returns: |
| True if successful, False otherwise. |
| """ |
| reset_wifi(pri_ad) |
| wifi_toggle_state(pri_ad, False) |
| wifi_test_device_init(pri_ad) |
| wifi_connect(pri_ad, network) |
| if not wifi_connection_check(pri_ad, network["SSID"]): |
| pri_ad.log.error("Wifi connection does not exist.") |
| return False |
| return True |
| |
| |
| def toggle_bluetooth(pri_ad, duration): |
| """Toggles bluetooth on/off for N iterations. |
| |
| Args: |
| pri_ad: An android device object. |
| duration: Iperf duration of the test. |
| |
| Returns: |
| True if successful, False otherwise. |
| """ |
| start_time = time.time() |
| while time.time() < start_time + duration: |
| if not enable_bluetooth(pri_ad.droid, pri_ad.ed): |
| pri_ad.log.error("Failed to enable bluetooth") |
| return False |
| time.sleep(BLUETOOTH_WAIT_TIME) |
| if not disable_bluetooth(pri_ad.droid): |
| pri_ad.log.error("Failed to turn off bluetooth") |
| return False |
| time.sleep(BLUETOOTH_WAIT_TIME) |
| return True |
| |
| |
| def toggle_screen_state(pri_ad, duration): |
| """Toggles the screen state to on or off.. |
| |
| Args: |
| pri_ad: Android device. |
| duration: Iperf duration of the test. |
| |
| Returns: |
| True if successful, False otherwise. |
| """ |
| start_time = time.time() |
| while time.time() < start_time + duration: |
| if not pri_ad.ensure_screen_on(): |
| pri_ad.log.error("User window cannot come up") |
| return False |
| if not pri_ad.go_to_sleep(): |
| pri_ad.log.info("Screen off") |
| return True |
| |
| |
| def setup_tel_config(pri_ad, sec_ad, sim_conf_file): |
| """Sets tel properties for primary device and secondary devices |
| |
| Args: |
| pri_ad: An android device object. |
| sec_ad: An android device object. |
| sim_conf_file: Sim card map. |
| |
| Returns: |
| pri_ad_num: Phone number of primary device. |
| sec_ad_num: Phone number of secondary device. |
| """ |
| setup_droid_properties(logging, pri_ad, sim_conf_file) |
| pri_ad_num = get_phone_number(logging, pri_ad) |
| setup_droid_properties(logging, sec_ad, sim_conf_file) |
| sec_ad_num = get_phone_number(logging, sec_ad) |
| return pri_ad_num, sec_ad_num |
| |
| |
| def start_fping(pri_ad, duration, fping_params): |
| """Starts fping to ping for DUT's ip address. |
| |
| Steps: |
| 1. Run fping command to check DUT's IP is alive or not. |
| |
| Args: |
| pri_ad: An android device object. |
| duration: Duration of fping in seconds. |
| fping_params: List of parameters for fping to run. |
| |
| Returns: |
| True if successful, False otherwise. |
| """ |
| counter = 0 |
| fping_path = ''.join((pri_ad.log_path, "/Fping")) |
| os.makedirs(fping_path, exist_ok=True) |
| while os.path.isfile(fping_path + "/fping_%s.txt" % counter): |
| counter += 1 |
| out_file_name = "{}".format("fping_%s.txt" % counter) |
| |
| full_out_path = os.path.join(fping_path, out_file_name) |
| cmd = "fping {} -D -c {}".format(get_phone_ip(pri_ad), duration) |
| if fping_params["ssh_config"]: |
| ssh_settings = settings.from_config(fping_params["ssh_config"]) |
| ssh_session = connection.SshConnection(ssh_settings) |
| try: |
| with open(full_out_path, 'w') as outfile: |
| job_result = ssh_session.run(cmd) |
| outfile.write(job_result.stdout) |
| outfile.write("\n") |
| outfile.writelines(job_result.stderr) |
| except Exception as err: |
| pri_ad.log.error("Fping run has been failed. = {}".format(err)) |
| return False |
| else: |
| cmd = cmd.split() |
| with open(full_out_path, "w") as f: |
| job.run(cmd) |
| result = parse_fping_results(fping_params["fping_drop_tolerance"], |
| full_out_path) |
| return bool(result) |
| |
| |
| def parse_fping_results(failure_rate, full_out_path): |
| """Calculates fping results. |
| |
| Steps: |
| 1. Read the file and calculate the results. |
| |
| Args: |
| failure_rate: Fping packet drop tolerance value. |
| full_out_path: path where the fping results has been stored. |
| |
| Returns: |
| loss_percent: loss percentage of fping packet. |
| """ |
| try: |
| result_file = open(full_out_path, "r") |
| lines = result_file.readlines() |
| res_line = lines[-1] |
| # Ex: res_line = "192.168.186.224 : xmt/rcv/%loss = 10/10/0%, |
| # min/avg/max = 36.7/251/1272" |
| loss_percent = re.search("[0-9]+%", res_line) |
| if int(loss_percent.group().strip("%")) > failure_rate: |
| logging.error("Packet drop observed") |
| return False |
| return loss_percent.group() |
| except Exception as e: |
| logging.error("Error in parsing fping results : %s" %(e)) |
| return False |
| |
| |
| def start_media_play(pri_ad, music_file_to_play): |
| """Starts media player on device. |
| |
| Args: |
| pri_ad : An android device. |
| music_file_to_play : An audio file to play. |
| |
| Returns: |
| True:If media player start music, False otherwise. |
| """ |
| if not pri_ad.droid.mediaPlayOpen( |
| "file:///sdcard/Music/{}".format(music_file_to_play)): |
| pri_ad.log.error("Failed to play music") |
| return False |
| |
| pri_ad.droid.mediaPlaySetLooping(True) |
| pri_ad.log.info("Music is now playing on device {}".format(pri_ad.serial)) |
| return True |
| |
| |
| def wifi_connection_check(pri_ad, ssid): |
| """Function to check existence of wifi connection. |
| |
| Args: |
| pri_ad : An android device. |
| ssid : wifi ssid to check. |
| |
| Returns: |
| True if wifi connection exists, False otherwise. |
| """ |
| wifi_info = pri_ad.droid.wifiGetConnectionInfo() |
| if (wifi_info["SSID"] == ssid and |
| wifi_info["supplicant_state"] == "completed"): |
| return True |
| pri_ad.log.error("Wifi Connection check failed : {}".format(wifi_info)) |
| return False |
| |
| |
| def push_music_to_android_device(ad, audio_params): |
| """Add music to Android device as specified by the test config |
| |
| Args: |
| ad: Android device |
| audio_params: Music file to push. |
| |
| Returns: |
| True on success, False on failure |
| """ |
| ad.log.info("Pushing music to the Android device") |
| android_music_path = "/sdcard/Music/" |
| music_path = audio_params["music_file"] |
| if type(music_path) is list: |
| ad.log.info("Media ready to push as is.") |
| for item in music_path: |
| music_file_to_play = item |
| ad.adb.push(item, android_music_path) |
| return music_file_to_play |
| else: |
| music_file_to_play = audio_params["music_file"] |
| ad.adb.push("{} {}".format(music_file_to_play, android_music_path)) |
| return (os.path.basename(music_file_to_play)) |
| |
| def bokeh_plot(data_sets, |
| legends, |
| fig_property, |
| shaded_region=None, |
| output_file_path=None): |
| """Plot bokeh figs. |
| Args: |
| data_sets: data sets including lists of x_data and lists of y_data |
| ex: [[[x_data1], [x_data2]], [[y_data1],[y_data2]]] |
| legends: list of legend for each curve |
| fig_property: dict containing the plot property, including title, |
| labels, linewidth, circle size, etc. |
| shaded_region: optional dict containing data for plot shading |
| output_file_path: optional path at which to save figure |
| Returns: |
| plot: bokeh plot figure object |
| """ |
| tools = 'box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save' |
| plot = figure(plot_width=1300, |
| plot_height=700, |
| title=fig_property['title'], |
| tools=tools, |
| output_backend="webgl") |
| plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="width")) |
| plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="height")) |
| colors = [ |
| 'red', 'green', 'blue', 'olive', 'orange', 'salmon', 'black', 'navy', |
| 'yellow', 'darkred', 'goldenrod' |
| ] |
| if shaded_region: |
| band_x = shaded_region["x_vector"] |
| band_x.extend(shaded_region["x_vector"][::-1]) |
| band_y = shaded_region["lower_limit"] |
| band_y.extend(shaded_region["upper_limit"][::-1]) |
| plot.patch(band_x, |
| band_y, |
| color='#7570B3', |
| line_alpha=0.1, |
| fill_alpha=0.1) |
| |
| for x_data, y_data, legend in zip(data_sets[0], data_sets[1], legends): |
| index_now = legends.index(legend) |
| color = colors[index_now % len(colors)] |
| plot.line(x_data, |
| y_data, |
| legend=str(legend), |
| line_width=fig_property['linewidth'], |
| color=color) |
| plot.circle(x_data, |
| y_data, |
| size=fig_property['markersize'], |
| legend=str(legend), |
| fill_color=color) |
| |
| # Plot properties |
| plot.xaxis.axis_label = fig_property['x_label'] |
| plot.yaxis.axis_label = fig_property['y_label'] |
| plot.legend.location = "top_right" |
| plot.legend.click_policy = "hide" |
| plot.title.text_font_size = {'value': '15pt'} |
| if output_file_path is not None: |
| output_file(output_file_path) |
| save(plot) |
| return plot |
| |
| def bokeh_chart_plot(bt_attenuation_range, |
| data_sets, |
| legends, |
| fig_property, |
| shaded_region=None, |
| output_file_path=None): |
| """Plot bokeh figs. |
| |
| Args: |
| bt_attenuation_range: range of BT attenuation. |
| data_sets: data sets including lists of x_data and lists of y_data |
| ex: [[[x_data1], [x_data2]], [[y_data1],[y_data2]]] |
| legends: list of legend for each curve |
| fig_property: dict containing the plot property, including title, |
| labels, linewidth, circle size, etc. |
| shaded_region: optional dict containing data for plot shading |
| output_file_path: optional path at which to save figure |
| |
| Returns: |
| plot: bokeh plot figure object |
| """ |
| TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save') |
| colors = [ |
| 'red', 'green', 'blue', 'olive', 'orange', 'salmon', 'black', 'navy', |
| 'yellow', 'darkred', 'goldenrod' |
| ] |
| plot = [] |
| data = [[], []] |
| legend = [] |
| for i in bt_attenuation_range: |
| if "Packet drop" in legends[i][0]: |
| plot_info = {0: "A2dp_packet_drop_plot", 1: "throughput_plot"} |
| else: |
| plot_info = {0: "throughput_plot"} |
| for j in plot_info: |
| if "Packet drops" in legends[i][j]: |
| if data_sets[i]["a2dp_packet_drops"]: |
| plot_i_j = figure( |
| plot_width=1000, |
| plot_height=500, |
| title=fig_property['title'], |
| tools=TOOLS) |
| |
| plot_i_j.add_tools( |
| bokeh_tools.WheelZoomTool(dimensions="width")) |
| plot_i_j.add_tools( |
| bokeh_tools.WheelZoomTool(dimensions="height")) |
| plot_i_j.xaxis.axis_label = fig_property['x_label'] |
| plot_i_j.yaxis.axis_label = fig_property['y_label'][j] |
| plot_i_j.legend.location = "top_right" |
| plot_i_j.legend.click_policy = "hide" |
| plot_i_j.title.text_font_size = {'value': '15pt'} |
| |
| plot_i_j.line( |
| data_sets[i]["a2dp_attenuation"], |
| data_sets[i]["a2dp_packet_drops"], |
| legend=legends[i][j], |
| line_width=3, |
| color=colors[j]) |
| plot_i_j.circle( |
| data_sets[i]["a2dp_attenuation"], |
| data_sets[i]["a2dp_packet_drops"], |
| legend=str(legends[i][j]), |
| fill_color=colors[j]) |
| plot.append(plot_i_j) |
| elif "Performance Results" in legends[i][j]: |
| plot_i_j = figure( |
| plot_width=1000, |
| plot_height=500, |
| title=fig_property['title'], |
| tools=TOOLS) |
| plot_i_j.add_tools( |
| bokeh_tools.WheelZoomTool(dimensions="width")) |
| plot_i_j.add_tools( |
| bokeh_tools.WheelZoomTool(dimensions="height")) |
| plot_i_j.xaxis.axis_label = fig_property['x_label'] |
| plot_i_j.yaxis.axis_label = fig_property['y_label'][j] |
| plot_i_j.legend.location = "top_right" |
| plot_i_j.legend.click_policy = "hide" |
| plot_i_j.title.text_font_size = {'value': '15pt'} |
| data[0].insert(0, data_sets[i]["attenuation"]) |
| data[1].insert(0, data_sets[i]["throughput_received"]) |
| legend.insert(0, legends[i][j + 1]) |
| plot_i_j.line( |
| data_sets[i]["user_attenuation"], |
| data_sets[i]["user_throughput"], |
| legend=legends[i][j], |
| line_width=3, |
| color=colors[j]) |
| plot_i_j.circle( |
| data_sets[i]["user_attenuation"], |
| data_sets[i]["user_throughput"], |
| legend=str(legends[i][j]), |
| fill_color=colors[j]) |
| plot_i_j.line( |
| data_sets[i]["attenuation"], |
| data_sets[i]["throughput_received"], |
| legend=legends[i][j + 1], |
| line_width=3, |
| color=colors[j]) |
| plot_i_j.circle( |
| data_sets[i]["attenuation"], |
| data_sets[i]["throughput_received"], |
| legend=str(legends[i][j + 1]), |
| fill_color=colors[j]) |
| if shaded_region: |
| band_x = shaded_region[i]["x_vector"] |
| band_x.extend(shaded_region[i]["x_vector"][::-1]) |
| band_y = shaded_region[i]["lower_limit"] |
| band_y.extend(shaded_region[i]["upper_limit"][::-1]) |
| plot_i_j.patch( |
| band_x, |
| band_y, |
| color='#7570B3', |
| line_alpha=0.1, |
| fill_alpha=0.1) |
| plot.append(plot_i_j) |
| else: |
| plot_i_j = figure( |
| plot_width=1000, |
| plot_height=500, |
| title=fig_property['title'], |
| tools=TOOLS) |
| plot_i_j.add_tools( |
| bokeh_tools.WheelZoomTool(dimensions="width")) |
| plot_i_j.add_tools( |
| bokeh_tools.WheelZoomTool(dimensions="height")) |
| plot_i_j.xaxis.axis_label = fig_property['x_label'] |
| plot_i_j.yaxis.axis_label = fig_property['y_label'][j] |
| plot_i_j.legend.location = "top_right" |
| plot_i_j.legend.click_policy = "hide" |
| plot_i_j.title.text_font_size = {'value': '15pt'} |
| data[0].insert(0, data_sets[i]["attenuation"]) |
| data[1].insert(0, data_sets[i]["throughput_received"]) |
| legend.insert(0, legends[i][j]) |
| plot_i_j.line( |
| data_sets[i]["attenuation"], |
| data_sets[i]["throughput_received"], |
| legend=legends[i][j], |
| line_width=3, |
| color=colors[j]) |
| plot_i_j.circle( |
| data_sets[i]["attenuation"], |
| data_sets[i]["throughput_received"], |
| legend=str(legends[i][j]), |
| fill_color=colors[j]) |
| plot.append(plot_i_j) |
| fig_property['y_label'] = "Throughput (Mbps)" |
| all_plot = bokeh_plot(data, legend, fig_property, shaded_region=None, |
| output_file_path=None) |
| plot.insert(0, all_plot) |
| if output_file_path is not None: |
| output_file(output_file_path) |
| save(column(plot)) |
| return plot |
| |
| |
| class A2dpDumpsysParser(): |
| |
| def __init__(self): |
| self.count_list = [] |
| self.frame_list = [] |
| self.dropped_count = None |
| |
| def parse(self, file_path): |
| """Convenience function to parse a2dp dumpsys logs. |
| |
| Args: |
| file_path: Path of dumpsys logs. |
| |
| Returns: |
| dropped_list containing packet drop count for every iteration. |
| drop containing list of all packets dropped for test suite. |
| """ |
| a2dp_dumpsys_info = [] |
| with open(file_path) as dumpsys_file: |
| for line in dumpsys_file: |
| if "A2DP State:" in line: |
| a2dp_dumpsys_info.append(line) |
| elif "Counts (max dropped)" not in line and len( |
| a2dp_dumpsys_info) > 0: |
| a2dp_dumpsys_info.append(line) |
| elif "Counts (max dropped)" in line: |
| a2dp_dumpsys_info = ''.join(a2dp_dumpsys_info) |
| a2dp_info = a2dp_dumpsys_info.split("\n") |
| # Ex: Frames per packet (total/max/ave) : 5034 / 1 / 0 |
| frames = int(re.split("[':/()]", str(a2dp_info[-3]))[-3]) |
| self.frame_list.append(frames) |
| # Ex : Counts (flushed/dropped/dropouts) : 0 / 4 / 0 |
| count = int(re.split("[':/()]", str(a2dp_info[-2]))[-2]) |
| if count > 0: |
| for i in range(len(self.count_list)): |
| count = count - self.count_list[i] |
| self.count_list.append(count) |
| if len(self.frame_list) > 1: |
| last_frame = self.frame_list[-1] - self.frame_list[ |
| -2] |
| self.dropped_count = (count / last_frame) * 100 |
| else: |
| self.dropped_count = ( |
| count / self.frame_list[-1]) * 100 |
| else: |
| self.dropped_count = count |
| logging.info(a2dp_dumpsys_info) |
| return self.dropped_count |