blob: 1251cd81c0c614e70b187597dcbfaeecce21d443 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2019 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Stream music through connected device from phone across different
attenuations."""
import json
import math
import time
import logging
import acts.controllers.iperf_client as ipc
import acts.controllers.iperf_server as ipf
import acts_contrib.test_utils.bt.bt_test_utils as btutils
from acts import asserts
from acts_contrib.test_utils.bt.A2dpBaseTest import A2dpBaseTest
from acts_contrib.test_utils.bt.loggers import bluetooth_metric_logger as log
from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wpeutils
from acts_contrib.test_utils.wifi import wifi_power_test_utils as wputils
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
from acts_contrib.test_utils.power.PowerBaseTest import ObjNew
MAX_ATTENUATION = 95
TEMP_FILE = '/sdcard/Download/tmp.log'
IPERF_CLIENT_ERROR = 'the client has unexpectedly closed the connection'
def setup_ap_connection(dut, network, ap, bandwidth=20):
"""Setup AP and connect DUT to it.
Args:
dut: the android device to connect and run traffic
network: the network config for the AP to be setup
ap: access point object
bandwidth: bandwidth of the WiFi network to be setup
Returns:
brconfigs: dict for bridge interface configs
"""
wutils.wifi_toggle_state(dut, True)
brconfigs = wputils.ap_setup(ap, network, bandwidth=bandwidth)
wutils.wifi_connect(dut, network, num_of_tries=3)
return brconfigs
def start_iperf_client(traffic_pair_obj, duration):
"""Setup iperf traffic for TCP downlink.
Args:
traffic_pair_obj: obj to contain info on traffic pair
duration: duration of iperf traffic to run
"""
# Construct the iperf command based on the test params
iperf_cmd = 'iperf3 -c {} -i 1 -t {} -p {} -J -R > {}'.format(
traffic_pair_obj.server_address, duration,
traffic_pair_obj.iperf_server.port, TEMP_FILE)
# Start IPERF client
traffic_pair_obj.dut.adb.shell_nb(iperf_cmd)
def unpack_custom_file(file):
"""Unpack the json file to .
Args:
file: custom json file.
"""
with open(file, 'r') as f:
params = json.load(f)
return params
def get_iperf_results(iperf_server_obj):
"""Get the iperf results and process.
Args:
iperf_server_obj: the IperfServer object
Returns:
throughput: the average throughput during tests.
"""
# Get IPERF results and add this to the plot title
iperf_file = iperf_server_obj.log_files[-1]
try:
iperf_result = ipf.IPerfResult(iperf_file)
# Compute the throughput in Mbit/s
if iperf_result.error == IPERF_CLIENT_ERROR:
rates = []
for item in iperf_result.result['intervals']:
rates.append(item['sum']['bits_per_second'] / 8 / 1024 / 1024)
throughput = ((math.fsum(rates) / len(rates))) * 8 * (1.024**2)
else:
throughput = (math.fsum(iperf_result.instantaneous_rates) / len(
iperf_result.instantaneous_rates)) * 8 * (1.024**2)
except (ValueError, TypeError):
throughput = 0
return throughput
def locate_interference_pair_by_channel(wifi_int_pairs, interference_channels):
"""Function to find which attenautor to set based on channel info
Args:
interference_channels: list of interference channels
Return:
interference_pair_indices: list of indices for interference pair
in wifi_int_pairs
"""
interference_pair_indices = []
for chan in interference_channels:
for i in range(len(wifi_int_pairs)):
if wifi_int_pairs[i].channel == chan:
interference_pair_indices.append(i)
return interference_pair_indices
def inject_static_wifi_interference(wifi_int_pairs, interference_level,
channels):
"""Function to inject wifi interference to bt link and read rssi.
Interference of IPERF traffic is always running, by setting attenuation,
the gate is opened to release the interference to the setup.
Args:
interference_level: the signal strength of wifi interference, use
attenuation level to represent this
channels: wifi channels where interference will
be injected, list
"""
all_pair = range(len(wifi_int_pairs))
interference_pair_indices = locate_interference_pair_by_channel(
wifi_int_pairs, channels)
inactive_interference_pairs_indices = [
item for item in all_pair if item not in interference_pair_indices
]
logging.info('WiFi interference at {} and inactive channels at {}'.format(
interference_pair_indices, inactive_interference_pairs_indices))
for i in interference_pair_indices:
wifi_int_pairs[i].attenuator.set_atten(interference_level)
logging.info('Set attenuation {} dB on attenuator {}'.format(
wifi_int_pairs[i].attenuator.get_atten(), i + 1))
for i in inactive_interference_pairs_indices:
wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION)
logging.info('Set attenuation {} dB on attenuator {}'.format(
wifi_int_pairs[i].attenuator.get_atten(), i + 1))
class BtInterferenceBaseTest(A2dpBaseTest):
def __init__(self, configs):
super().__init__(configs)
self.bt_logger = log.BluetoothMetricLogger.for_test_case()
self.start_time = time.time()
req_params = [
'attenuation_vector', 'wifi_networks', 'codecs', 'custom_files',
'audio_params'
]
self.unpack_userparams(req_params)
for file in self.custom_files:
if 'static_interference' in file:
self.static_wifi_interference = unpack_custom_file(file)
elif 'dynamic_interference' in file:
self.dynamic_wifi_interference = unpack_custom_file(file)
def setup_class(self):
super().setup_class()
# Build object to store all necessary information for each pair of wifi
# interference setup: phone, ap, network, channel, iperf server port/ip
# object and bridge interface configs
if len(self.android_devices) < 5 or len(self.attenuators) < 4:
self.log.error('Need a 4 channel attenuator and 5 android phones'
'please update the config file')
self.wifi_int_pairs = []
for i in range(len(self.attenuators) - 1):
tmp_dict = {
'dut': self.android_devices[i + 1],
'ap': self.access_points[i],
'network': self.wifi_networks[i],
'channel': self.wifi_networks[i]['channel'],
'iperf_server': self.iperf_servers[i],
'attenuator': self.attenuators[i + 1],
'ether_int': self.packet_senders[i],
'iperf_client':
ipc.IPerfClientOverAdb(self.android_devices[i + 1])
}
tmp_obj = ObjNew(**tmp_dict)
self.wifi_int_pairs.append(tmp_obj)
##Setup connection between WiFi APs and Phones and get DHCP address
# for the interface
for obj in self.wifi_int_pairs:
brconfigs = setup_ap_connection(obj.dut, obj.network, obj.ap)
iperf_server_address = wputils.wait_for_dhcp(
obj.ether_int.interface)
setattr(obj, 'server_address', iperf_server_address)
setattr(obj, 'brconfigs', brconfigs)
obj.attenuator.set_atten(MAX_ATTENUATION)
# Enable BQR on master and slave Android device
btutils.enable_bqr(self.dut)
btutils.enable_bqr(self.bt_device_controller)
def teardown_class(self):
super().teardown_class()
for obj in self.wifi_int_pairs:
obj.ap.bridge.teardown(obj.brconfigs)
self.log.info('Stop IPERF server at port {}'.format(
obj.iperf_server.port))
obj.iperf_server.stop()
self.log.info('Stop IPERF process on {}'.format(obj.dut.serial))
#only for glinux machine
# wputils.bring_down_interface(obj.ether_int.interface)
obj.attenuator.set_atten(MAX_ATTENUATION)
obj.ap.close()
def teardown_test(self):
super().teardown_test()
for obj in self.wifi_int_pairs:
obj.attenuator.set_atten(MAX_ATTENUATION)
def play_and_record_audio(self, duration, queue):
"""Play and record audio for a set duration.
Args:
duration: duration in seconds for music playing
que: multiprocess que to store the return value of this function
Returns:
audio_captured: captured audio file path
"""
self.log.info('Play and record audio for {} second'.format(duration))
self.media.play()
self.audio_device.start()
time.sleep(duration)
audio_captured = self.audio_device.stop()
self.media.stop()
self.log.info('Audio play and record stopped')
asserts.assert_true(audio_captured, 'Audio not recorded')
queue.put(audio_captured)
def locate_interference_pair_by_channel(self, interference_channels):
"""Function to find which attenautor to set based on channel info
Args:
interference_channels: list of interference channels
Return:
interference_pair_indices: list of indices for interference pair
in self.wifi_int_pairs
"""
interference_pair_indices = []
for chan in interference_channels:
for i in range(len(self.wifi_int_pairs)):
if self.wifi_int_pairs[i].channel == chan:
interference_pair_indices.append(i)
return interference_pair_indices
def get_interference_rssi(self):
"""Function to read wifi interference RSSI level."""
bssids = []
self.interference_rssi = []
wutils.wifi_toggle_state(self.android_devices[0], True)
for item in self.wifi_int_pairs:
ssid = item.network['SSID']
bssid = item.ap.get_bssid_from_ssid(ssid, '2g')
bssids.append(bssid)
interference_rssi_dict = {
"ssid": ssid,
"bssid": bssid,
"chan": item.channel,
"rssi": 0
}
self.interference_rssi.append(interference_rssi_dict)
scaned_rssi = wpeutils.get_scan_rssi(self.android_devices[0],
bssids,
num_measurements=2)
for item in self.interference_rssi:
item['rssi'] = scaned_rssi[item['bssid']]['mean']
self.log.info('Interference RSSI at channel {} is {} dBm'.format(
item['chan'], item['rssi']))
wutils.wifi_toggle_state(self.android_devices[0], False)