blob: d2aad85c1ce548db9d67ac584a0d281031faeb5f [file] [log] [blame]
#!/usr/bin/env python3.4
#
# Copyright 2018 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import random
import time
import acts.controllers.packet_capture as packet_capture
import acts.signals as signals
import acts_contrib.test_utils.wifi.rpm_controller_utils as rutils
import acts_contrib.test_utils.wifi.wifi_datastore_utils as dutils
import acts_contrib.test_utils.wifi.wifi_test_utils as wutils
from acts import asserts
from acts.base_test import BaseTestClass
from acts.controllers.ap_lib import hostapd_constants
from acts.test_decorators import test_tracker_info
from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
WifiEnums = wutils.WifiEnums
WAIT_BEFORE_CONNECTION = 1
SINGLE_BAND = 1
DUAL_BAND = 2
TIMEOUT = 60
TEST = 'test_'
PING_ADDR = 'www.google.com'
NUM_LINK_PROBES = 3
PROBE_DELAY_SEC = 1
class WifiChaosTest(WifiBaseTest):
""" Tests for wifi IOT
Test Bed Requirement:
* One Android device
* Wi-Fi IOT networks visible to the device
"""
def __init__(self, configs):
BaseTestClass.__init__(self, configs)
self.generate_interop_tests()
def randomize_testcases(self):
"""Randomize the list of hosts and build a random order of tests,
based on SSIDs, keeping all the relevant bands of an AP together.
"""
temp_tests = list()
hosts = self.user_params['interop_host']
random.shuffle(hosts)
for host in hosts:
ssid_2g = None
ssid_5g = None
info = dutils.show_device(host)
# Based on the information in datastore, add to test list if
# AP has 2G band.
if 'ssid_2g' in info:
ssid_2g = info['ssid_2g']
temp_tests.append(TEST + ssid_2g)
# Based on the information in datastore, add to test list if
# AP has 5G band.
if 'ssid_5g' in info:
ssid_5g = info['ssid_5g']
temp_tests.append(TEST + ssid_5g)
self.tests = temp_tests
def generate_interop_testcase(self, base_test, testcase_name, ssid_dict):
"""Generates a single test case from the given data.
Args:
base_test: The base test case function to run.
testcase_name: The name of the test case.
ssid_dict: The information about the network under test.
"""
ssid = testcase_name
test_tracker_uuid = ssid_dict[testcase_name]['uuid']
hostname = ssid_dict[testcase_name]['host']
if not testcase_name.startswith('test_'):
testcase_name = 'test_%s' % testcase_name
test_case = test_tracker_info(
uuid=test_tracker_uuid)(lambda: base_test(ssid, hostname))
setattr(self, testcase_name, test_case)
self.tests.append(testcase_name)
def generate_interop_tests(self):
for ssid_dict in self.user_params['interop_ssid']:
testcase_name = list(ssid_dict)[0]
self.generate_interop_testcase(self.interop_base_test,
testcase_name, ssid_dict)
self.randomize_testcases()
def setup_class(self):
super().setup_class()
self.dut = self.android_devices[0]
self.admin = 'admin' + str(random.randint(1000001, 12345678))
wutils.wifi_test_device_init(self.dut)
# Set country code explicitly to "US".
wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)
asserts.assert_true(
self.lock_pcap(),
"Could not lock a Packet Capture. Aborting Interop test.")
wutils.wifi_toggle_state(self.dut, True)
def lock_pcap(self):
"""Lock a Packet Capturere to use for the test."""
# Get list of devices from the datastore.
locked_pcap = False
devices = dutils.get_devices()
for device in devices:
device_name = device['hostname']
device_type = device['ap_label']
if device_type == 'PCAP' and not device['lock_status']:
if dutils.lock_device(device_name, self.admin):
self.pcap_host = device_name
host = device['ip_address']
self.log.info("Locked Packet Capture device: %s" %
device_name)
locked_pcap = True
break
else:
self.log.warning("Failed to lock %s PCAP." % device_name)
if not locked_pcap:
return False
pcap_config = {'ssh_config': {'user': 'root'}}
pcap_config['ssh_config']['host'] = host
self.pcap = packet_capture.PacketCapture(pcap_config)
return True
def setup_test(self):
super().setup_test()
self.dut.droid.wakeLockAcquireBright()
self.dut.droid.wakeUpNow()
def on_pass(self, test_name, begin_time):
wutils.stop_pcap(self.pcap, self.pcap_procs, True)
def on_fail(self, test_name, begin_time):
# Sleep to ensure all failed packets are captured.
time.sleep(5)
wutils.stop_pcap(self.pcap, self.pcap_procs, False)
super().on_fail(test_name, begin_time)
def teardown_test(self):
super().teardown_test()
self.dut.droid.wakeLockRelease()
self.dut.droid.goToSleepNow()
def teardown_class(self):
# Unlock the PCAP.
if not dutils.unlock_device(self.pcap_host):
self.log.warning("Failed to unlock %s PCAP. Check in datastore.")
"""Helper Functions"""
def scan_and_connect_by_id(self, network, net_id):
"""Scan for network and connect using network id.
Args:
net_id: Integer specifying the network id of the network.
"""
ssid = network[WifiEnums.SSID_KEY]
wutils.start_wifi_connection_scan_and_ensure_network_found(
self.dut, ssid)
wutils.wifi_connect_by_id(self.dut, net_id)
def run_ping(self, sec):
"""Run ping for given number of seconds.
Args:
sec: Time in seconds to run teh ping traffic.
"""
self.log.info("Finding Gateway...")
route_response = self.dut.adb.shell("ip route get 8.8.8.8")
gateway_ip = re.search('via (.*) dev', str(route_response)).group(1)
self.log.info("Gateway IP = %s" % gateway_ip)
self.log.info("Running ping for %d seconds" % sec)
result = self.dut.adb.shell("ping -w %d %s" % (sec, gateway_ip),
timeout=sec + 1)
self.log.debug("Ping Result = %s" % result)
if "100% packet loss" in result:
raise signals.TestFailure("100% packet loss during ping")
def send_link_probes(self, network):
"""
Send link probes, and verify that the device and AP did not crash.
Also verify that at least one link probe succeeded.
Steps:
1. Send a few link probes.
2. Ensure that the device and AP did not crash (by checking that the
device remains connected to the expected network).
"""
results = wutils.send_link_probes(self.dut, NUM_LINK_PROBES,
PROBE_DELAY_SEC)
self.log.info("Link Probe results: %s" % (results, ))
wifi_info = self.dut.droid.wifiGetConnectionInfo()
expected = network[WifiEnums.SSID_KEY]
actual = wifi_info[WifiEnums.SSID_KEY]
asserts.assert_equal(
expected, actual,
"Device did not remain connected after sending link probes!")
def unlock_and_turn_off_ap(self, hostname, rpm_port, rpm_ip):
"""UNlock the AP in datastore and turn off the AP.
Args:
hostname: Hostname of the AP.
rpm_port: Port number on the RPM for the AP.
rpm_ip: RPM's IP address.
"""
# Un-Lock AP in datastore.
self.log.debug("Un-lock AP in datastore")
if not dutils.unlock_device(hostname):
self.log.warning("Failed to unlock %s AP. Check AP in datastore.")
# Turn OFF AP from the RPM port.
rutils.turn_off_ap(rpm_port, rpm_ip)
def run_connect_disconnect(self, network, hostname, rpm_port, rpm_ip,
release_ap):
"""Run connect/disconnect to a given network in loop.
Args:
network: Dict, network information.
hostname: Hostanme of the AP to connect to.
rpm_port: Port number on the RPM for the AP.
rpm_ip: Port number on the RPM for the AP.
release_ap: Flag to determine if we should turn off the AP yet.
Raises: TestFailure if the network connection fails.
"""
for attempt in range(5):
try:
begin_time = time.time()
ssid = network[WifiEnums.SSID_KEY]
net_id = self.dut.droid.wifiAddNetwork(network)
asserts.assert_true(net_id != -1,
"Add network %s failed" % network)
self.log.info("Connecting to %s" % ssid)
self.scan_and_connect_by_id(network, net_id)
self.run_ping(10)
# TODO(b/133369482): uncomment once bug is resolved
# self.send_link_probes(network)
wutils.wifi_forget_network(self.dut, ssid)
time.sleep(WAIT_BEFORE_CONNECTION)
except Exception as e:
self.log.error("Connection to %s network failed on the %d "
"attempt with exception %s." %
(ssid, attempt, e))
# TODO:(bmahadev) Uncomment after scan issue is fixed.
# self.dut.take_bug_report(ssid, begin_time)
# self.dut.cat_adb_log(ssid, begin_time)
if release_ap:
self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip)
raise signals.TestFailure("Failed to connect to %s" % ssid)
def get_band_and_chan(self, ssid):
"""Get the band and channel information from SSID.
Args:
ssid: SSID of the network.
"""
ssid_info = ssid.split('_')
self.band = ssid_info[-1]
for item in ssid_info:
# Skip over the router model part.
if 'ch' in item and item != ssid_info[0]:
self.chan = re.search(r'(\d+)', item).group(0)
return
raise signals.TestFailure("Channel information not found in SSID.")
def interop_base_test(self, ssid, hostname):
"""Base test for all the connect-disconnect interop tests.
Args:
ssid: string, SSID of the network to connect to.
hostname: string, hostname of the AP.
Steps:
1. Lock AP in datstore.
2. Turn on AP on the rpm switch.
3. Run connect-disconnect in loop.
4. Turn off AP on the rpm switch.
5. Unlock AP in datastore.
"""
network = {}
network['password'] = 'password'
network['SSID'] = ssid
release_ap = False
wutils.reset_wifi(self.dut)
# Lock AP in datastore.
self.log.info("Lock AP in datastore")
ap_info = dutils.show_device(hostname)
# If AP is locked by a different test admin, then we skip.
if ap_info['lock_status'] and ap_info['locked_by'] != self.admin:
raise signals.TestSkip("AP %s is locked, skipping test" % hostname)
if not dutils.lock_device(hostname, self.admin):
self.log.warning("Failed to lock %s AP. Unlock AP in datastore"
" and try again.")
raise signals.TestFailure("Failed to lock AP")
band = SINGLE_BAND
if ('ssid_2g' in ap_info) and ('ssid_5g' in ap_info):
band = DUAL_BAND
if (band == SINGLE_BAND) or (band == DUAL_BAND and '5G' in ssid):
release_ap = True
# Get AP RPM attributes and Turn ON AP.
rpm_ip = ap_info['rpm_ip']
rpm_port = ap_info['rpm_port']
rutils.turn_on_ap(self.pcap, ssid, rpm_port, rpm_ip=rpm_ip)
self.log.info("Finished turning ON AP.")
# Experimental. Some APs take upto a min to come online.
time.sleep(60)
self.get_band_and_chan(ssid)
self.pcap.configure_monitor_mode(self.band, self.chan)
self.pcap_procs = wutils.start_pcap(self.pcap, self.band.lower(),
self.test_name)
self.run_connect_disconnect(network, hostname, rpm_port, rpm_ip,
release_ap)
# Un-lock only if it's a single band AP or we are running the last band.
if release_ap:
self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip)