| #!/usr/bin/env python3 |
| # |
| # Copyright (C) 2019 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Stream music through connected device from phone across different |
| attenuations.""" |
| |
| import json |
| import math |
| import time |
| import logging |
| import acts.controllers.iperf_client as ipc |
| import acts.controllers.iperf_server as ipf |
| import acts_contrib.test_utils.bt.bt_test_utils as btutils |
| from acts import asserts |
| from acts_contrib.test_utils.bt.A2dpBaseTest import A2dpBaseTest |
| from acts_contrib.test_utils.bt.loggers import bluetooth_metric_logger as log |
| from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wpeutils |
| from acts_contrib.test_utils.wifi import wifi_power_test_utils as wputils |
| from acts_contrib.test_utils.wifi import wifi_test_utils as wutils |
| from acts_contrib.test_utils.power.PowerBaseTest import ObjNew |
| |
| MAX_ATTENUATION = 95 |
| TEMP_FILE = '/sdcard/Download/tmp.log' |
| IPERF_CLIENT_ERROR = 'the client has unexpectedly closed the connection' |
| |
| |
| def setup_ap_connection(dut, network, ap, bandwidth=20): |
| """Setup AP and connect DUT to it. |
| |
| Args: |
| dut: the android device to connect and run traffic |
| network: the network config for the AP to be setup |
| ap: access point object |
| bandwidth: bandwidth of the WiFi network to be setup |
| Returns: |
| brconfigs: dict for bridge interface configs |
| """ |
| wutils.wifi_toggle_state(dut, True) |
| brconfigs = wputils.ap_setup(ap, network, bandwidth=bandwidth) |
| wutils.wifi_connect(dut, network, num_of_tries=3) |
| return brconfigs |
| |
| |
| def start_iperf_client(traffic_pair_obj, duration): |
| """Setup iperf traffic for TCP downlink. |
| Args: |
| traffic_pair_obj: obj to contain info on traffic pair |
| duration: duration of iperf traffic to run |
| """ |
| # Construct the iperf command based on the test params |
| iperf_cmd = 'iperf3 -c {} -i 1 -t {} -p {} -J -R > {}'.format( |
| traffic_pair_obj.server_address, duration, |
| traffic_pair_obj.iperf_server.port, TEMP_FILE) |
| # Start IPERF client |
| traffic_pair_obj.dut.adb.shell_nb(iperf_cmd) |
| |
| |
| def unpack_custom_file(file): |
| """Unpack the json file to . |
| |
| Args: |
| file: custom json file. |
| """ |
| with open(file, 'r') as f: |
| params = json.load(f) |
| return params |
| |
| |
| def get_iperf_results(iperf_server_obj): |
| """Get the iperf results and process. |
| |
| Args: |
| iperf_server_obj: the IperfServer object |
| Returns: |
| throughput: the average throughput during tests. |
| """ |
| # Get IPERF results and add this to the plot title |
| iperf_file = iperf_server_obj.log_files[-1] |
| try: |
| iperf_result = ipf.IPerfResult(iperf_file) |
| # Compute the throughput in Mbit/s |
| if iperf_result.error == IPERF_CLIENT_ERROR: |
| rates = [] |
| for item in iperf_result.result['intervals']: |
| rates.append(item['sum']['bits_per_second'] / 8 / 1024 / 1024) |
| throughput = ((math.fsum(rates) / len(rates))) * 8 * (1.024**2) |
| else: |
| throughput = (math.fsum(iperf_result.instantaneous_rates) / len( |
| iperf_result.instantaneous_rates)) * 8 * (1.024**2) |
| except (ValueError, TypeError): |
| throughput = 0 |
| return throughput |
| |
| |
| def locate_interference_pair_by_channel(wifi_int_pairs, interference_channels): |
| """Function to find which attenautor to set based on channel info |
| Args: |
| interference_channels: list of interference channels |
| Return: |
| interference_pair_indices: list of indices for interference pair |
| in wifi_int_pairs |
| """ |
| interference_pair_indices = [] |
| for chan in interference_channels: |
| for i in range(len(wifi_int_pairs)): |
| if wifi_int_pairs[i].channel == chan: |
| interference_pair_indices.append(i) |
| return interference_pair_indices |
| |
| |
| def inject_static_wifi_interference(wifi_int_pairs, interference_level, |
| channels): |
| """Function to inject wifi interference to bt link and read rssi. |
| |
| Interference of IPERF traffic is always running, by setting attenuation, |
| the gate is opened to release the interference to the setup. |
| Args: |
| interference_level: the signal strength of wifi interference, use |
| attenuation level to represent this |
| channels: wifi channels where interference will |
| be injected, list |
| """ |
| all_pair = range(len(wifi_int_pairs)) |
| interference_pair_indices = locate_interference_pair_by_channel( |
| wifi_int_pairs, channels) |
| inactive_interference_pairs_indices = [ |
| item for item in all_pair if item not in interference_pair_indices |
| ] |
| logging.info('WiFi interference at {} and inactive channels at {}'.format( |
| interference_pair_indices, inactive_interference_pairs_indices)) |
| for i in interference_pair_indices: |
| wifi_int_pairs[i].attenuator.set_atten(interference_level) |
| logging.info('Set attenuation {} dB on attenuator {}'.format( |
| wifi_int_pairs[i].attenuator.get_atten(), i + 1)) |
| for i in inactive_interference_pairs_indices: |
| wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION) |
| logging.info('Set attenuation {} dB on attenuator {}'.format( |
| wifi_int_pairs[i].attenuator.get_atten(), i + 1)) |
| |
| |
| class BtInterferenceBaseTest(A2dpBaseTest): |
| def __init__(self, configs): |
| super().__init__(configs) |
| self.bt_logger = log.BluetoothMetricLogger.for_test_case() |
| self.start_time = time.time() |
| req_params = [ |
| 'attenuation_vector', 'wifi_networks', 'codecs', 'custom_files', |
| 'audio_params' |
| ] |
| self.unpack_userparams(req_params) |
| for file in self.custom_files: |
| if 'static_interference' in file: |
| self.static_wifi_interference = unpack_custom_file(file) |
| elif 'dynamic_interference' in file: |
| self.dynamic_wifi_interference = unpack_custom_file(file) |
| |
| def setup_class(self): |
| super().setup_class() |
| # Build object to store all necessary information for each pair of wifi |
| # interference setup: phone, ap, network, channel, iperf server port/ip |
| # object and bridge interface configs |
| if len(self.android_devices) < 5 or len(self.attenuators) < 4: |
| self.log.error('Need a 4 channel attenuator and 5 android phones' |
| 'please update the config file') |
| self.wifi_int_pairs = [] |
| for i in range(len(self.attenuators) - 1): |
| tmp_dict = { |
| 'dut': self.android_devices[i + 1], |
| 'ap': self.access_points[i], |
| 'network': self.wifi_networks[i], |
| 'channel': self.wifi_networks[i]['channel'], |
| 'iperf_server': self.iperf_servers[i], |
| 'attenuator': self.attenuators[i + 1], |
| 'ether_int': self.packet_senders[i], |
| 'iperf_client': |
| ipc.IPerfClientOverAdb(self.android_devices[i + 1]) |
| } |
| tmp_obj = ObjNew(**tmp_dict) |
| self.wifi_int_pairs.append(tmp_obj) |
| ##Setup connection between WiFi APs and Phones and get DHCP address |
| # for the interface |
| for obj in self.wifi_int_pairs: |
| brconfigs = setup_ap_connection(obj.dut, obj.network, obj.ap) |
| iperf_server_address = wputils.wait_for_dhcp( |
| obj.ether_int.interface) |
| setattr(obj, 'server_address', iperf_server_address) |
| setattr(obj, 'brconfigs', brconfigs) |
| obj.attenuator.set_atten(MAX_ATTENUATION) |
| # Enable BQR on master and slave Android device |
| btutils.enable_bqr(self.dut) |
| btutils.enable_bqr(self.bt_device_controller) |
| |
| def teardown_class(self): |
| super().teardown_class() |
| for obj in self.wifi_int_pairs: |
| obj.ap.bridge.teardown(obj.brconfigs) |
| self.log.info('Stop IPERF server at port {}'.format( |
| obj.iperf_server.port)) |
| obj.iperf_server.stop() |
| self.log.info('Stop IPERF process on {}'.format(obj.dut.serial)) |
| #only for glinux machine |
| # wputils.bring_down_interface(obj.ether_int.interface) |
| obj.attenuator.set_atten(MAX_ATTENUATION) |
| obj.ap.close() |
| |
| def teardown_test(self): |
| |
| super().teardown_test() |
| for obj in self.wifi_int_pairs: |
| obj.attenuator.set_atten(MAX_ATTENUATION) |
| |
| def play_and_record_audio(self, duration, queue): |
| """Play and record audio for a set duration. |
| |
| Args: |
| duration: duration in seconds for music playing |
| que: multiprocess que to store the return value of this function |
| Returns: |
| audio_captured: captured audio file path |
| """ |
| |
| self.log.info('Play and record audio for {} second'.format(duration)) |
| self.media.play() |
| self.audio_device.start() |
| time.sleep(duration) |
| audio_captured = self.audio_device.stop() |
| self.media.stop() |
| self.log.info('Audio play and record stopped') |
| asserts.assert_true(audio_captured, 'Audio not recorded') |
| queue.put(audio_captured) |
| |
| def locate_interference_pair_by_channel(self, interference_channels): |
| """Function to find which attenautor to set based on channel info |
| Args: |
| interference_channels: list of interference channels |
| Return: |
| interference_pair_indices: list of indices for interference pair |
| in self.wifi_int_pairs |
| """ |
| interference_pair_indices = [] |
| for chan in interference_channels: |
| for i in range(len(self.wifi_int_pairs)): |
| if self.wifi_int_pairs[i].channel == chan: |
| interference_pair_indices.append(i) |
| return interference_pair_indices |
| |
| def get_interference_rssi(self): |
| """Function to read wifi interference RSSI level.""" |
| |
| bssids = [] |
| self.interference_rssi = [] |
| wutils.wifi_toggle_state(self.android_devices[0], True) |
| for item in self.wifi_int_pairs: |
| ssid = item.network['SSID'] |
| bssid = item.ap.get_bssid_from_ssid(ssid, '2g') |
| bssids.append(bssid) |
| interference_rssi_dict = { |
| "ssid": ssid, |
| "bssid": bssid, |
| "chan": item.channel, |
| "rssi": 0 |
| } |
| self.interference_rssi.append(interference_rssi_dict) |
| scaned_rssi = wpeutils.get_scan_rssi(self.android_devices[0], |
| bssids, |
| num_measurements=2) |
| for item in self.interference_rssi: |
| item['rssi'] = scaned_rssi[item['bssid']]['mean'] |
| self.log.info('Interference RSSI at channel {} is {} dBm'.format( |
| item['chan'], item['rssi'])) |
| wutils.wifi_toggle_state(self.android_devices[0], False) |