blob: 22c2b2f61df240c46600bdb53f3215d01733121c [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2019 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import json
import random
import sys
import logging
import re
from acts.base_test import BaseTestClass
from acts_contrib.test_utils.bt.BtInterferenceBaseTest import inject_static_wifi_interference
from acts_contrib.test_utils.bt.BtInterferenceBaseTest import unpack_custom_file
from acts_contrib.test_utils.power.PowerBaseTest import ObjNew
from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wpeutils
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
import time
MAX_ATTENUATION = 95
INIT_ATTEN = 0
SCAN = 'wpa_cli scan'
SCAN_RESULTS = 'wpa_cli scan_results'
class InjectWifiInterferenceTest(BaseTestClass):
def __init__(self, configs):
super().__init__(configs)
req_params = ['custom_files', 'wifi_networks']
self.unpack_userparams(req_params)
for file in self.custom_files:
if 'static_interference' in file:
self.static_wifi_interference = unpack_custom_file(file)
elif 'dynamic_interference' in file:
self.dynamic_wifi_interference = unpack_custom_file(file)
def setup_class(self):
self.dut = self.android_devices[0]
# Set attenuator to minimum attenuation
if hasattr(self, 'attenuators'):
self.attenuator = self.attenuators[0]
self.attenuator.set_atten(INIT_ATTEN)
self.wifi_int_pairs = []
for i in range(len(self.attenuators) - 1):
tmp_dict = {
'attenuator': self.attenuators[i + 1],
'network': self.wifi_networks[i],
'channel': self.wifi_networks[i]['channel']
}
tmp_obj = ObjNew(**tmp_dict)
self.wifi_int_pairs.append(tmp_obj)
##Setup connection between WiFi APs and Phones and get DHCP address
# for the interface
for obj in self.wifi_int_pairs:
obj.attenuator.set_atten(INIT_ATTEN)
def setup_test(self):
self.log.info("Setup test initiated")
def teardown_class(self):
for obj in self.wifi_int_pairs:
obj.attenuator.set_atten(MAX_ATTENUATION)
def teardown_test(self):
for obj in self.wifi_int_pairs:
obj.attenuator.set_atten(MAX_ATTENUATION)
def test_inject_static_wifi_interference(self):
condition = True
while condition:
attenuation = [
int(x) for x in input(
"Please enter 4 channel attenuation value followed by comma :\n"
).split(',')
]
self.set_atten_all_channel(attenuation)
# Read interference RSSI
self.interference_rssi = get_interference_rssi(
self.dut, self.wifi_int_pairs)
self.log.info('Under the WiFi interference condition: '
'channel 1 RSSI: {} dBm, '
'channel 6 RSSI: {} dBm'
'channel 11 RSSI: {} dBm'.format(
self.interference_rssi[0]['rssi'],
self.interference_rssi[1]['rssi'],
self.interference_rssi[2]['rssi']))
condition = True
return True
def test_inject_dynamic_interface(self):
atten = int(input("Please enter the attenuation level for CHAN1 :"))
self.attenuator.set_atten(atten)
self.log.info("Attenuation for CHAN1 set to:{} dB".format(atten))
interference_rssi = None
self.channel_change_interval = self.dynamic_wifi_interference[
'channel_change_interval_second']
self.wifi_int_levels = list(
self.dynamic_wifi_interference['interference_level'].keys())
for wifi_level in self.wifi_int_levels:
interference_atten_level = self.dynamic_wifi_interference[
'interference_level'][wifi_level]
all_pair = range(len(self.wifi_int_pairs))
# Set initial WiFi interference at channel 1
logging.info('Start with interference at channel 1')
self.wifi_int_pairs[0].attenuator.set_atten(
interference_atten_level)
self.wifi_int_pairs[1].attenuator.set_atten(MAX_ATTENUATION)
self.wifi_int_pairs[2].attenuator.set_atten(MAX_ATTENUATION)
current_int_pair = [0]
inactive_int_pairs = [
item for item in all_pair if item not in current_int_pair
]
logging.info(
'Inject random changing channel (1,6,11) wifi interference'
'every {} second'.format(self.channel_change_interval))
while True:
current_int_pair = [
random.randint(inactive_int_pairs[0],
inactive_int_pairs[1])
]
inactive_int_pairs = [
item for item in all_pair if item not in current_int_pair
]
self.wifi_int_pairs[current_int_pair[0]].attenuator.set_atten(
interference_atten_level)
logging.info('Current interference at channel {}'.format(
self.wifi_int_pairs[current_int_pair[0]].channel))
for i in inactive_int_pairs:
self.wifi_int_pairs[i].attenuator.set_atten(
MAX_ATTENUATION)
# Read interference RSSI
self.interference_rssi = get_interference_rssi(
self.dut, self.wifi_int_pairs)
self.log.info('Under the WiFi interference condition: '
'channel 1 RSSI: {} dBm, '
'channel 6 RSSI: {} dBm'
'channel 11 RSSI: {} dBm'.format(
self.interference_rssi[0]['rssi'],
self.interference_rssi[1]['rssi'],
self.interference_rssi[2]['rssi']))
time.sleep(self.channel_change_interval)
return True
def set_atten_all_channel(self, attenuation):
self.attenuators[0].set_atten(attenuation[0])
self.attenuators[1].set_atten(attenuation[1])
self.attenuators[2].set_atten(attenuation[2])
self.attenuators[3].set_atten(attenuation[3])
self.log.info(
"Attenuation set to CHAN1:{},CHAN2:{},CHAN3:{},CHAN4:{}".format(
self.attenuators[0].get_atten(),
self.attenuators[1].get_atten(),
self.attenuators[2].get_atten(),
self.attenuators[3].get_atten()))
def get_interference_rssi(dut, wifi_int_pairs):
"""Function to read wifi interference RSSI level."""
bssids = []
interference_rssi = []
wutils.wifi_toggle_state(dut, True)
for item in wifi_int_pairs:
ssid = item.network['SSID']
bssid = item.network['bssid']
bssids.append(bssid)
interference_rssi_dict = {
"ssid": ssid,
"bssid": bssid,
"chan": item.channel,
"rssi": 0
}
interference_rssi.append(interference_rssi_dict)
scaned_rssi = wpeutils.get_scan_rssi(dut, bssids, num_measurements=2)
for item in interference_rssi:
item['rssi'] = scaned_rssi[item['bssid']]['mean']
logging.info('Interference RSSI at channel {} is {} dBm'.format(
item['chan'], item['rssi']))
wutils.wifi_toggle_state(dut, False)
return interference_rssi