blob: 6e2babe44d01ec1a996f96d604eb72fd85c72d1b [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2018 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import pprint
import time
import acts.signals
import acts_contrib.test_utils.wifi.wifi_test_utils as wutils
from acts import asserts
from acts.test_decorators import test_tracker_info
from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
from acts.controllers import iperf_server as ipf
from acts.controllers import attenuator
from acts.controllers.sl4a_lib import rpc_client
import json
import logging
import math
import os
from acts import utils
import csv
import serial
import sys
import urllib.request
from acts_contrib.test_utils.wifi import wifi_performance_test_utils_RSSI as wperfutils
WifiEnums = wutils.WifiEnums
class WifiRvrTwTest(WifiBaseTest):
""" Tests for wifi RVR performance.
Test Bed Requirement:
* One Android device
* Wi-Fi networks visible to the device
"""
TEST_TIMEOUT = 10
IPERF_SETUP_TIME = 5
TURN_TABLE_SETUP_TIME = 5
def __init__(self, controllers):
WifiBaseTest.__init__(self, controllers)
def setup_class(self):
self.dut = self.android_devices[0]
req_params = ["rvr_networks", "rvr_test_params", "attenuators"]
opt_params = ["angle_params", "usb_port"]
self.unpack_userparams(
req_param_names=req_params, opt_param_names=opt_params)
asserts.assert_true(
len(self.rvr_networks) > 0, "Need at least one network.")
if "rvr_test_params" in self.user_params:
self.iperf_server = self.iperf_servers[0]
self.maxdb = self.rvr_test_params["rvr_atten_maxdb"]
self.mindb = self.rvr_test_params["rvr_atten_mindb"]
self.stepdb = self.rvr_test_params["rvr_atten_step"]
self.country_code = self.rvr_test_params["country_code"]
if "angle_params" in self.user_params:
self.angle_list = self.angle_params
if "usb_port" in self.user_params:
self.turntable_port = self.read_comport(self.usb_port["turntable"])
# Init DUT
wutils.wifi_test_device_init(self.dut, self.country_code)
self.dut.droid.bluetoothToggleState(False)
utils.set_location_service(self.dut, False)
wutils.wifi_toggle_state(self.dut, True)
utils.subprocess.check_output(
"adb root", shell=True, timeout=self.TEST_TIMEOUT)
utils.subprocess.check_output(
"adb shell settings put system screen_off_timeout 18000000",
shell=True,
timeout=self.TEST_TIMEOUT)
utils.subprocess.check_output(
"adb shell svc power stayon true",
shell=True,
timeout=self.TEST_TIMEOUT)
# create folder for rvr test result
self.log_path = os.path.join(logging.log_path, "rvr_results")
utils.create_dir(self.log_path)
Header = ("Test_date", "Project", "Device_SN", "ROM", "HW_Stage",
"test_SSID", "Frequency", "Turn_table_orientation",
"Attenuate_dB", "Signal_poll_avg_rssi", "Chain_0_rssi",
"Chain_1_rssi", "Link_speed", "TX_throughput_Mbps",
"RX_throughput_Mbps", "HE_Capable", "Country_code", "Channel",
"WiFi_chip", "Type", "Host_name", "AP_model",
"Incremental_build_id", "Build_type", "TCP_UDP_Protocol",
"Security_type", "Test_tool", "Airplane_mode_status", "BT_status",
"Bug_ID", "Comment")
self.csv_write(Header)
def setup_test(self):
self.dut.droid.wakeLockAcquireBright()
self.dut.droid.wakeUpNow()
rom_info = self.get_rominfo()
self.testdate = time.strftime("%Y-%m-%d", time.localtime())
self.rom = rom_info[0]
self.build_id = rom_info[1]
self.build_type = rom_info[2]
self.project = rom_info[3]
self.ret_country_code = self.get_country_code()
self.ret_hw_stage = self.get_hw_stage()
self.ret_platform = wperfutils.detect_wifi_platform(self.dut)
def teardown_test(self):
self.dut.droid.wakeLockRelease()
self.dut.droid.goToSleepNow()
wutils.set_attns(self.attenuators, "default")
def teardown_class(self):
if "rvr_test_params" in self.user_params:
self.iperf_server.stop()
def on_fail(self, test_name, begin_time):
self.dut.take_bug_report(test_name, begin_time)
self.dut.cat_adb_log(test_name, begin_time)
"""Helper Functions"""
def csv_write(self, data):
"""Output .CSV file for test result.
Args:
data: Dict containing attenuation, throughput and other meta data.
"""
with open(
"{}/Result.csv".format(self.log_path), "a", newline="") as csv_file:
csv_writer = csv.writer(csv_file, delimiter=",")
csv_writer.writerow(data)
csv_file.close()
def set_atten(self, db):
"""Setup attenuator dB for current test.
Args:
db: Attenuator setup dB.
"""
if db < 0:
db = 0
elif db > 95:
db = 95
self.log.info("[Attenuation] %s", "Set dB = " + str(db) + "dB")
for atten in self.attenuators:
atten.set_atten(db)
self.log.info("[Attenuation] %s",
"Current dB = " + str(atten.get_atten()) + "dB")
retry = 0
while atten.get_atten() != db and retry < 11:
retry = retry + 1
self.log.info(
"[Attenuation] %s", "Fail to set Attenuator to " + str(db) + ", " +
str(retry) + " times try to reset")
self.set_atten(db)
if retry == 11:
self.log.info("Attenuation] %s",
"Retry Attenuator fail for 10 cycles, end test!")
sys.exit()
def read_comport(self, com):
"""Read com port for current test.
Args:
com: Serial port.
Returns:
port: Serial port with baud rate.
"""
port = serial.Serial(com, 9600, timeout=1)
time.sleep(1)
return port
def get_angle(self, port):
"""Get turn table angle for current test.
Args:
port: Turn table com port.
Returns:
angle: Angle from turn table.
"""
angle = ""
port.write("DG?;".encode())
time.sleep(0.1)
degree_data = port.readline().decode("utf-8")
for data in range(len(degree_data)):
if (degree_data[data].isdigit()) is True:
angle = angle + degree_data[data]
if angle == "":
return -1
return int(angle)
def set_angle(self, port, angle):
"""Setup turn table angle for current test.
Args:
port: Turn table com port
angle: Turn table setup angle
"""
if angle > 359:
angle = 359
elif angle < 0:
angle = 0
self.log.info("Set angle to " + str(angle))
input_angle = str("DG") + str(angle) + str(";")
port.write(input_angle.encode())
time.sleep(self.TURN_TABLE_SETUP_TIME)
def check_angle(self, port, angle):
"""Check turn table angle for current test.
Args:
port: Turn table com port
angle: Turn table setup angle
"""
retrytime = self.TEST_TIMEOUT
retry = 0
while self.get_angle(port) != angle and retry < retrytime:
retry = retry + 1
self.log.info("Turntable] %s",
"Current angle = " + str(self.get_angle(port)))
self.log.info(
"Turntable] %s", "Fail set angle to " + str(angle) + ", " +
str(retry) + " times try to reset")
self.set_angle(port, angle)
time.sleep(self.TURN_TABLE_SETUP_TIME)
if retry == retrytime:
self.log.info(
"Turntable] %s",
"Retry turntable fail for " + str(retry) + " cycles, end test!")
sys.exit()
def get_wifiinfo(self):
"""Get WiFi RSSI/ link speed/ frequency for current test.
Returns:
[rssi,link_speed,frequency]: DUT WiFi RSSI,Link speed and Frequency.
"""
def is_number(string):
for i in string:
if i.isdigit() is False:
if (i == "-" or i == "."):
continue
return str(-1)
return string
try:
cmd = "adb shell iw wlan0 link"
wifiinfo = utils.subprocess.check_output(
cmd, shell=True, timeout=self.TEST_TIMEOUT)
# Check RSSI Enhance
rssi = self.get_rssi_func()
# Check link speed
link_speed = wifiinfo.decode(
"utf-8")[wifiinfo.decode("utf-8").find("bitrate:") +
8:wifiinfo.decode("utf-8").find("Bit/s") - 2]
link_speed = link_speed.strip(" ")
link_speed = is_number(link_speed)
# Check frequency
frequency = wifiinfo.decode(
"utf-8")[wifiinfo.decode("utf-8").find("freq:") +
6:wifiinfo.decode("utf-8").find("freq:") + 10]
frequency = frequency.strip(" ")
frequency = is_number(frequency)
except:
return -1, -1, -1
return [rssi, link_speed, frequency]
def get_rssi_func(self):
"""Get RSSI from brcm/qcom wifi chip.
Returns:
current_rssi: DUT WiFi RSSI.
"""
if self.ret_platform == "brcm":
rssi_future = wperfutils.get_connected_rssi_brcm(self.dut)
signal_poll_avg_rssi_tmp = rssi_future.pop("signal_poll_avg_rssi").pop(
"mean")
chain_0_rssi_tmp = rssi_future.pop("chain_0_rssi").pop("mean")
chain_1_rssi_tmp = rssi_future.pop("chain_1_rssi").pop("mean")
current_rssi = {
"signal_poll_avg_rssi": signal_poll_avg_rssi_tmp,
"chain_0_rssi": chain_0_rssi_tmp,
"chain_1_rssi": chain_1_rssi_tmp
}
elif self.ret_platform == "qcom":
rssi_future = wperfutils.get_connected_rssi_qcom(
self.dut, interface="wlan0")
signal_poll_avg_rssi_tmp = rssi_future.pop("signal_poll_avg_rssi").pop(
"mean")
chain_0_rssi_tmp = rssi_future.pop("chain_0_rssi").pop("mean")
chain_1_rssi_tmp = rssi_future.pop("chain_1_rssi").pop("mean")
if math.isnan(signal_poll_avg_rssi_tmp):
signal_poll_avg_rssi_tmp = -1
if math.isnan(chain_0_rssi_tmp):
chain_0_rssi_tmp = -1
if math.isnan(chain_1_rssi_tmp):
chain_1_rssi_tmp = -1
if signal_poll_avg_rssi_tmp == -1 & chain_0_rssi_tmp == -1 & chain_1_rssi_tmp == -1:
current_rssi = -1
else:
current_rssi = {
"signal_poll_avg_rssi": signal_poll_avg_rssi_tmp,
"chain_0_rssi": chain_0_rssi_tmp,
"chain_1_rssi": chain_1_rssi_tmp
}
else:
current_rssi = {
"signal_poll_avg_rssi": float("nan"),
"chain_0_rssi": float("nan"),
"chain_1_rssi": float("nan")
}
return current_rssi
def get_rominfo(self):
"""Get DUT ROM build info.
Returns:
rom, build_id, build_type, project: DUT Build info,Build ID,
Build type, and Project name
"""
rom = "NA"
build_id = "NA"
build_type = "NA"
project = "NA"
rominfo = self.dut.adb.shell("getprop ro.build.display.id").split()
if rominfo:
rom = rominfo[2]
build_id = rominfo[3]
project, build_type = rominfo[0].split("-")
return rom, build_id, build_type, project
def get_hw_stage(self):
"""Get DUT HW stage.
Returns:
hw_stage: DUT HW stage e.g. EVT/DVT/PVT..etc.
"""
cmd = "adb shell getprop ro.boot.hardware.revision"
hw_stage_temp = utils.subprocess.check_output(
cmd, shell=True, timeout=self.TEST_TIMEOUT)
hw_stage = hw_stage_temp.decode("utf-8").split("\n")[0]
return hw_stage
def get_country_code(self):
"""Get DUT country code.
Returns:
country_code: DUT country code e.g. US/JP/GE..etc.
"""
cmd = "adb shell cmd wifi get-country-code"
country_code_temp = utils.subprocess.check_output(
cmd, shell=True, timeout=self.TEST_TIMEOUT)
country_code = country_code_temp.decode("utf-8").split(" ")[4].split(
"\n")[0]
return country_code
def get_channel(self):
"""Get DUT WiFi channel.
Returns:
country_code: DUT channel e.g. 6/36/37..etc.
"""
if self.ret_platform == "brcm":
cmd = 'adb shell wl assoc | grep "Primary channel:"'
channel_temp = utils.subprocess.check_output(
cmd, shell=True, timeout=self.TEST_TIMEOUT)
channel = channel_temp.decode("utf-8").split(": ")[1].split("\n")[0]
elif self.ret_platform == "qcom":
cmd = "adb shell iw wlan0 info | grep channel"
channel_temp = utils.subprocess.check_output(
cmd, shell=True, timeout=self.TEST_TIMEOUT)
channel = channel_temp.decode("utf-8").split(" ")[1].split("\n")[0]
return channel
def get_he_capable(self):
"""Get DUT WiFi high efficiency capable status .
Returns:
he_capable: DUT high efficiency capable status.
"""
if self.ret_platform == "brcm":
cmd = 'adb shell wl assoc | grep "Chanspec:"'
he_temp = utils.subprocess.check_output(
cmd, shell=True, timeout=self.TEST_TIMEOUT)
he_capable = he_temp.decode("utf-8").split(": ")[1].split("\n")[0].split(
"MHz")[0].split(" ")[3]
elif self.ret_platform == "qcom":
cmd = "adb shell iw wlan0 info | grep channel"
he_temp = utils.subprocess.check_output(
cmd, shell=True, timeout=self.TEST_TIMEOUT)
he_capable = he_temp.decode("utf-8").split("width: ")[1].split(" ")[0]
return he_capable
def post_process_results(self, rvr_result):
"""Saves JSON formatted results.
Args:
rvr_result: Dict containing attenuation, throughput and other meta data
Returns:
wifiinfo[0]: To check WiFi connection by RSSI value
"""
# Save output as text file
wifiinfo = self.get_wifiinfo()
if wifiinfo[0] != -1:
rvr_result["signal_poll_avg_rssi"] = wifiinfo[0]["signal_poll_avg_rssi"]
rvr_result["chain_0_rssi"] = wifiinfo[0]["chain_0_rssi"]
rvr_result["chain_1_rssi"] = wifiinfo[0]["chain_1_rssi"]
else:
rvr_result["signal_poll_avg_rssi"] = wifiinfo[0]
rvr_result["chain_0_rssi"] = wifiinfo[0]
rvr_result["chain_1_rssi"] = wifiinfo[0]
if rvr_result["signal_poll_avg_rssi"] == -1:
rvr_result["channel"] = "NA"
else:
rvr_result["channel"] = self.ret_channel
rvr_result["country_code"] = self.ret_country_code
rvr_result["hw_stage"] = self.ret_hw_stage
rvr_result["wifi_chip"] = self.ret_platform
rvr_result["test_ssid"] = self.ssid
rvr_result["test_angle"] = self.angle_list[self.angle]
rvr_result["test_dB"] = self.db
rvr_result["test_link_speed"] = wifiinfo[1]
rvr_result["test_frequency"] = wifiinfo[2]
data = (
self.testdate,
self.project,
self.dut.serial,
self.rom,
rvr_result["hw_stage"],
rvr_result["test_ssid"],
rvr_result["test_frequency"],
rvr_result["test_angle"],
rvr_result["test_dB"],
rvr_result["signal_poll_avg_rssi"],
rvr_result["chain_0_rssi"],
rvr_result["chain_1_rssi"],
rvr_result["test_link_speed"],
rvr_result["throughput_TX"][0],
rvr_result["throughput_RX"][0],
"HE" + self.he_capable,
rvr_result["country_code"],
rvr_result["channel"],
rvr_result["wifi_chip"],
"OTA_RvR",
"OTA_Testbed2",
"RAXE500",
self.build_id,
self.build_type,
"TCP",
"WPA3",
"iperf3",
"OFF",
"OFF",
)
self.csv_write(data)
results_file_path = "{}/{}_angle{}_{}dB.json".format(
self.log_path, self.ssid, self.angle_list[self.angle], self.db)
with open(results_file_path, "w") as results_file:
json.dump(rvr_result, results_file, indent=4)
return wifiinfo[0]
def connect_to_wifi_network(self, network):
"""Connection logic for wifi networks.
Args:
params: Dictionary with network info.
"""
ssid = network[WifiEnums.SSID_KEY]
self.dut.ed.clear_all_events()
wutils.start_wifi_connection_scan(self.dut)
scan_results = self.dut.droid.wifiGetScanResults()
wutils.assert_network_in_list({WifiEnums.SSID_KEY: ssid}, scan_results)
wutils.wifi_connect(self.dut, network, num_of_tries=3)
def run_iperf_init(self, network):
self.iperf_server.start(tag="init")
self.log.info("[Iperf] %s", "Starting iperf traffic init.")
time.sleep(self.IPERF_SETUP_TIME)
try:
port_arg = "-p {} -J -R -t10".format(self.iperf_server.port)
self.dut.run_iperf_client(
self.rvr_test_params["iperf_server_address"],
port_arg,
timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT)
self.iperf_server.stop()
self.log.info("[Iperf] %s", "iperf traffic init Pass")
except:
self.log.warning("ValueError: iperf init ERROR.")
def run_iperf_client(self, network):
"""Run iperf TX throughput after connection.
Args:
network: Dictionary with network info.
Returns:
rvr_result: Dict containing TX rvr_results.
"""
rvr_result = []
try:
self.iperf_server.start(tag="TX_server_{}_angle{}_{}dB".format(
self.ssid, self.angle_list[self.angle], self.db))
ssid = network[WifiEnums.SSID_KEY]
self.log.info("[Iperf] %s",
"Starting iperf traffic TX through {}".format(ssid))
time.sleep(self.IPERF_SETUP_TIME)
port_arg = "-p {} -J {}".format(self.iperf_server.port,
self.rvr_test_params["iperf_port_arg"])
success, data = self.dut.run_iperf_client(
self.rvr_test_params["iperf_server_address"],
port_arg,
timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT)
# Parse and log result
client_output_path = os.path.join(
self.iperf_server.log_path,
"IperfDUT,{},TX_client_{}_angle{}_{}dB".format(
self.iperf_server.port, self.ssid, self.angle_list[self.angle],
self.db))
with open(client_output_path, "w") as out_file:
out_file.write("\n".join(data))
self.iperf_server.stop()
iperf_file = self.iperf_server.log_files[-1]
iperf_result = ipf.IPerfResult(iperf_file)
curr_throughput = (math.fsum(iperf_result.instantaneous_rates[
self.rvr_test_params["iperf_ignored_interval"]:-1]) /
len(iperf_result.instantaneous_rates[
self.rvr_test_params["iperf_ignored_interval"]:-1])
) * 8 * (1.024**2)
rvr_result.append(curr_throughput)
self.log.info(
"[Iperf] %s", "TX Throughput at {0:.2f} dB is {1:.2f} Mbps".format(
self.db, curr_throughput))
self.log.debug(pprint.pformat(data))
asserts.assert_true(success, "Error occurred in iPerf traffic.")
return rvr_result
except:
rvr_result = ["NA"]
self.log.warning("ValueError: TX iperf ERROR.")
self.iperf_server.stop()
return rvr_result
def run_iperf_server(self, network):
"""Run iperf RX throughput after connection.
Args:
network: Dictionary with network info.
Returns:
rvr_result: Dict containing RX rvr_results.
"""
rvr_result = []
try:
self.iperf_server.start(tag="RX_client_{}_angle{}_{}dB".format(
self.ssid, self.angle_list[self.angle], self.db))
ssid = network[WifiEnums.SSID_KEY]
self.log.info("[Iperf] %s",
"Starting iperf traffic RX through {}".format(ssid))
time.sleep(self.IPERF_SETUP_TIME)
port_arg = "-p {} -J -R {}".format(self.iperf_server.port,
self.rvr_test_params["iperf_port_arg"])
success, data = self.dut.run_iperf_client(
self.rvr_test_params["iperf_server_address"],
port_arg,
timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT)
# Parse and log result
client_output_path = os.path.join(
self.iperf_server.log_path,
"IperfDUT,{},RX_server_{}_angle{}_{}dB".format(
self.iperf_server.port, self.ssid, self.angle_list[self.angle],
self.db))
with open(client_output_path, "w") as out_file:
out_file.write("\n".join(data))
self.iperf_server.stop()
iperf_file = client_output_path
iperf_result = ipf.IPerfResult(iperf_file)
curr_throughput = (math.fsum(iperf_result.instantaneous_rates[
self.rvr_test_params["iperf_ignored_interval"]:-1]) /
len(iperf_result.instantaneous_rates[
self.rvr_test_params["iperf_ignored_interval"]:-1])
) * 8 * (1.024**2)
rvr_result.append(curr_throughput)
self.log.info(
"[Iperf] %s", "RX Throughput at {0:.2f} dB is {1:.2f} Mbps".format(
self.db, curr_throughput))
self.log.debug(pprint.pformat(data))
asserts.assert_true(success, "Error occurred in iPerf traffic.")
return rvr_result
except:
rvr_result = ["NA"]
self.log.warning("ValueError: RX iperf ERROR.")
self.iperf_server.stop()
return rvr_result
def iperf_test_func(self, network):
"""Main function to test iperf TX/RX.
Args:
network: Dictionary with network info.
"""
# Initialize
rvr_result = {}
# Run RvR and log result
rvr_result["throughput_RX"] = self.run_iperf_server(network)
retry_time = 2
for retry in range(retry_time):
if rvr_result["throughput_RX"] == ["NA"]:
if not self.iperf_retry():
time.sleep(self.IPERF_SETUP_TIME)
rvr_result["throughput_RX"] = self.run_iperf_server(network)
else:
break
else:
break
rvr_result["throughput_TX"] = self.run_iperf_client(network)
retry_time = 2
for retry in range(retry_time):
if rvr_result["throughput_TX"] == ["NA"]:
if not self.iperf_retry():
time.sleep(self.IPERF_SETUP_TIME)
rvr_result["throughput_TX"] = self.run_iperf_client(network)
else:
break
else:
break
self.post_process_results(rvr_result)
self.rssi = wifiinfo[0]
return self.rssi
def iperf_retry(self):
"""Check iperf TX/RX status and retry."""
try:
cmd = "adb -s {} shell pidof iperf3| xargs adb shell kill -9".format(
self.dut.serial)
utils.subprocess.call(cmd, shell=True, timeout=self.TEST_TIMEOUT)
self.log.warning("ValueError: Killed DUT iperf process, keep test")
except:
self.log.info("[Iperf] %s", "No iperf DUT process found, keep test")
wifiinfo = self.get_wifiinfo()
print("--[iperf_retry]--", wifiinfo[0])
self.log.info("[WiFiinfo] %s", "Current RSSI = " + str(wifiinfo[0]) + "dBm")
if wifiinfo[0] == -1:
self.log.warning("ValueError: Cannot get RSSI, stop throughput test")
return True
else:
return False
def rvr_test(self, network):
"""Test function to run RvR.
The function runs an RvR test in the current device/AP configuration.
Function is called from another wrapper function that sets up the
testbed for the RvR test
Args:
params: Dictionary with network info
"""
self.ssid = network[WifiEnums.SSID_KEY]
self.log.info("Start rvr test")
for angle in range(len(self.angle_list)):
self.angle = angle
self.set_angle(self.turntable_port, self.angle_list[angle])
self.check_angle(self.turntable_port, self.angle_list[angle])
self.set_atten(0)
self.connect_to_wifi_network(network)
self.ret_channel = self.get_channel()
self.he_capable = self.get_he_capable()
self.run_iperf_init(network)
for db in range(self.mindb, self.maxdb + self.stepdb, self.stepdb):
self.db = db
self.set_atten(self.db)
self.iperf_test_func(network)
if self.rssi == -1:
self.log.warning("ValueError: Cannot get RSSI. Run next angle")
break
else:
continue
wutils.reset_wifi(self.dut)
"""Tests"""
def test_rvr_2g(self):
network = self.rvr_networks[0]
self.rvr_test(network)
def test_rvr_5g(self):
network = self.rvr_networks[1]
self.rvr_test(network)
def test_rvr_6g(self):
network = self.rvr_networks[2]
self.rvr_test(network)