blob: bf653af753a6861c8c1df7a2a5fed5f5831273e3 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2018 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import math
import os
import re
import time
import acts.controllers.power_monitor as power_monitor_lib
import acts.controllers.iperf_server as ipf
from acts import asserts
from acts import base_test
from acts import utils
from acts.metrics.loggers.blackbox import BlackboxMetricLogger
from acts_contrib.test_utils.power.loggers.power_metric_logger import PowerMetricLogger
from acts_contrib.test_utils.power import plot_utils
RESET_BATTERY_STATS = 'dumpsys batterystats --reset'
IPERF_TIMEOUT = 180
THRESHOLD_TOLERANCE_DEFAULT = 0.2
GET_FROM_PHONE = 'get_from_dut'
GET_FROM_AP = 'get_from_ap'
PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2
MONSOON_MAX_CURRENT = 8.0
DEFAULT_MONSOON_FREQUENCY = 500
ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM='
MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM='
TEMP_FILE = '/sdcard/Download/tmp.log'
class ObjNew(object):
"""Create a random obj with unknown attributes and value.
"""
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def __contains__(self, item):
"""Function to check if one attribute is contained in the object.
Args:
item: the item to check
Return:
True/False
"""
return hasattr(self, item)
class PowerBaseTest(base_test.BaseTestClass):
"""Base class for all wireless power related tests.
"""
def __init__(self, controllers):
super().__init__(controllers)
self.power_result = BlackboxMetricLogger.for_test_case(
metric_name='avg_power')
self.start_meas_time = 0
self.rockbottom_script = None
self.img_name = ''
self.dut = None
self.power_logger = PowerMetricLogger.for_test_case()
self.power_monitor = None
@property
def final_test(self):
return len(
self.results.requested
) > 0 and self.current_test_name == self.results.requested[-1]
@property
def display_name_test_suite(self):
return getattr(self, '_display_name_test_suite',
self.__class__.__name__)
@display_name_test_suite.setter
def display_name_test_suite(self, name):
self._display_name_test_suite = name
@property
def display_name_test_case(self):
default_test_name = getattr(self, 'test_name', None)
return getattr(self, '_display_name_test_case', default_test_name)
@display_name_test_case.setter
def display_name_test_case(self, name):
self._display_name_test_case = name
def initialize_power_monitor(self):
""" Initializes the power monitor object.
Raises an exception if there are no controllers available.
"""
if hasattr(self, 'monsoons'):
self.power_monitor = power_monitor_lib.PowerMonitorMonsoonFacade(
self.monsoons[0])
self.monsoons[0].set_max_current(8.0)
self.monsoons[0].set_voltage(self.mon_voltage)
else:
raise RuntimeError('No power monitors available.')
def setup_class(self):
super().setup_class()
self.log = logging.getLogger()
self.tests = self.get_existing_test_names()
# Obtain test parameters from user_params
TEST_PARAMS = self.TAG + '_params'
self.test_params = self.user_params.get(TEST_PARAMS, {})
if not self.test_params:
self.log.warning(TEST_PARAMS + ' was not found in the user '
'parameters defined in the config file.')
# Override user_param values with test parameters
self.user_params.update(self.test_params)
# Unpack user_params with default values. All the usages of user_params
# as self attributes need to be included either as a required parameter
# or as a parameter with a default value.
req_params = ['custom_files', 'mon_duration']
self.unpack_userparams(req_params,
mon_freq=DEFAULT_MONSOON_FREQUENCY,
mon_offset=0,
bug_report=False,
extra_wait=None,
iperf_duration=None,
pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT,
mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT)
# Setup the must have controllers, phone and monsoon
self.dut = self.android_devices[0]
self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
os.makedirs(self.mon_data_path, exist_ok=True)
# Initialize the power monitor object that will be used to measure
self.initialize_power_monitor()
# Unpack the thresholds file or fail class setup if it can't be found
for file in self.custom_files:
if 'pass_fail_threshold_' + self.dut.model in file:
self.threshold_file = file
break
else:
raise RuntimeError('Required test pass/fail threshold file is '
'missing')
# Unpack the rockbottom script or fail class setup if it can't be found
for file in self.custom_files:
if 'rockbottom_' + self.dut.model in file:
self.rockbottom_script = file
break
else:
raise RuntimeError('Required rockbottom script is missing.')
# Unpack optional custom files
for file in self.custom_files:
if 'attenuator_setting' in file:
self.attenuation_file = file
elif 'network_config' in file:
self.network_file = file
if hasattr(self, 'attenuators'):
self.num_atten = self.attenuators[0].instrument.num_atten
self.atten_level = self.unpack_custom_file(self.attenuation_file)
self.threshold = self.unpack_custom_file(self.threshold_file)
self.mon_info = self.create_monsoon_info()
# Sync device time, timezone and country code
utils.require_sl4a((self.dut, ))
utils.sync_device_time(self.dut)
screen_on_img = self.user_params.get('screen_on_img', [])
if screen_on_img:
img_src = screen_on_img[0]
img_dest = '/sdcard/Pictures/'
success = self.dut.push_system_file(img_src, img_dest)
if success:
self.img_name = os.path.basename(img_src)
def setup_test(self):
"""Set up test specific parameters or configs.
"""
super().setup_test()
# Reset result variables
self.avg_current = 0
self.samples = []
self.power_result.metric_value = 0
# Set the device into rockbottom state
self.dut_rockbottom()
# Wait for extra time if needed for the first test
if self.extra_wait:
self.more_wait_first_test()
def teardown_test(self):
"""Tear down necessary objects after test case is finished.
"""
self.log.info('Tearing down the test case')
self.power_monitor.connect_usb()
self.power_logger.set_avg_power(self.power_result.metric_value)
self.power_logger.set_avg_current(self.avg_current)
self.power_logger.set_voltage(self.mon_voltage)
self.power_logger.set_testbed(self.testbed_name)
# If a threshold was provided, log it in the power proto
if self.threshold and self.test_name in self.threshold:
avg_current_threshold = self.threshold[self.test_name]
self.power_logger.set_avg_current_threshold(avg_current_threshold)
build_id = self.dut.build_info.get('build_id', '')
incr_build_id = self.dut.build_info.get('incremental_build_id', '')
branch = self.user_params.get('branch', '')
target = self.dut.device_info.get('flavor', '')
self.power_logger.set_branch(branch)
self.power_logger.set_build_id(build_id)
self.power_logger.set_incremental_build_id(incr_build_id)
self.power_logger.set_target(target)
# Log the display name of the test suite and test case
if self.display_name_test_suite:
name = self.display_name_test_suite
self.power_logger.set_test_suite_display_name(name)
if self.display_name_test_case:
name = self.display_name_test_case
self.power_logger.set_test_case_display_name(name)
# Take Bugreport
if self.bug_report:
begin_time = utils.get_current_epoch_time()
self.dut.take_bug_report(self.test_name, begin_time)
# Allow the device to cooldown before executing the next test
cooldown = self.test_params.get('cooldown', None)
if cooldown and not self.final_test:
time.sleep(cooldown)
def teardown_class(self):
"""Clean up the test class after tests finish running
"""
self.log.info('Tearing down the test class')
if self.power_monitor:
self.power_monitor.connect_usb()
def on_fail(self, test_name, begin_time):
self.power_logger.set_pass_fail_status('FAIL')
def on_pass(self, test_name, begin_time):
self.power_logger.set_pass_fail_status('PASS')
def dut_rockbottom(self):
"""Set the dut to rockbottom state
"""
# The rockbottom script might include a device reboot, so it is
# necessary to stop SL4A during its execution.
self.dut.stop_services()
self.log.info('Executing rockbottom script for ' + self.dut.model)
os.chmod(self.rockbottom_script, 0o777)
os.system('{} {} {}'.format(self.rockbottom_script, self.dut.serial,
self.img_name))
# Make sure the DUT is in root mode after coming back
self.dut.root_adb()
# Restart SL4A
self.dut.start_services()
def unpack_custom_file(self, file, test_specific=True):
"""Unpack the pass_fail_thresholds from a common file.
Args:
file: the common file containing pass fail threshold.
test_specific: if True, returns the JSON element within the file
that starts with the test class name.
"""
with open(file, 'r') as f:
params = json.load(f)
if test_specific:
try:
return params[self.TAG]
except KeyError:
pass
else:
return params
def decode_test_configs(self, attrs, indices):
"""Decode the test config/params from test name.
Remove redundant function calls when tests are similar.
Args:
attrs: a list of the attrs of the test config obj
indices: a list of the location indices of keyword in the test name.
"""
# Decode test parameters for the current test
test_params = self.current_test_name.split('_')
values = [test_params[x] for x in indices]
config_dict = dict(zip(attrs, values))
self.test_configs = ObjNew(**config_dict)
def more_wait_first_test(self):
# For the first test, increase the offset for longer wait time
if self.current_test_name == self.tests[0]:
self.mon_info.offset = self.mon_offset + self.extra_wait
else:
self.mon_info.offset = self.mon_offset
def set_attenuation(self, atten_list):
"""Function to set the attenuator to desired attenuations.
Args:
atten_list: list containing the attenuation for each attenuator.
"""
if len(atten_list) != self.num_atten:
raise Exception('List given does not have the correct length')
for i in range(self.num_atten):
self.attenuators[i].set_atten(atten_list[i])
def measure_power_and_validate(self):
"""The actual test flow and result processing and validate.
"""
self.collect_power_data()
self.pass_fail_check(self.avg_current)
def collect_power_data(self):
"""Measure power, plot and take log if needed.
Returns:
A MonsoonResult object.
"""
# Collecting current measurement data and plot
samples = self.power_monitor_data_collect_save()
current = [sample[1] for sample in samples]
average_current = sum(current) * 1000 / len(current)
self.power_result.metric_value = (average_current * self.mon_voltage)
self.avg_current = average_current
plot_title = '{}_{}_{}'.format(self.test_name, self.dut.model,
self.dut.build_info['build_id'])
plot_utils.current_waveform_plot(samples, self.mon_voltage,
self.mon_info.data_path, plot_title)
return samples
def pass_fail_check(self, average_current=None):
"""Check the test result and decide if it passed or failed.
The threshold is provided in the config file. In this class, result is
current in mA.
"""
if not self.threshold or self.test_name not in self.threshold:
self.log.error("No threshold is provided for the test '{}' in "
"the configuration file.".format(self.test_name))
return
current_threshold = self.threshold[self.test_name]
if average_current:
asserts.assert_true(
abs(average_current - current_threshold) / current_threshold <
self.pass_fail_tolerance,
'Measured average current in [{}]: {:.2f}mA, which is '
'out of the acceptable range {:.2f}±{:.2f}mA'.format(
self.test_name, average_current, current_threshold,
self.pass_fail_tolerance * current_threshold))
asserts.explicit_pass(
'Measurement finished for [{}]: {:.2f}mA, which is '
'within the acceptable range {:.2f}±{:.2f}'.format(
self.test_name, average_current, current_threshold,
self.pass_fail_tolerance * current_threshold))
else:
asserts.fail(
'Something happened, measurement is not complete, test failed')
def create_monsoon_info(self):
"""Creates the config dictionary for monsoon
Returns:
mon_info: Dictionary with the monsoon packet config
"""
mon_info = ObjNew(freq=self.mon_freq,
duration=self.mon_duration,
offset=self.mon_offset,
data_path=self.mon_data_path)
return mon_info
def power_monitor_data_collect_save(self):
"""Current measurement and save the log file.
Collect current data using Monsoon box and return the path of the
log file. Take bug report if requested.
Returns:
A list of tuples in which the first element is a timestamp and the
second element is the sampled current in Amperes at that time.
"""
tag = '{}_{}_{}'.format(self.test_name, self.dut.model,
self.dut.build_info['build_id'])
data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag))
# If the specified Monsoon data file already exists (e.g., multiple
# measurements in a single test), write the results to a new file with
# the postfix "_#".
if os.path.exists(data_path):
highest_value = 1
for filename in os.listdir(os.path.dirname(data_path)):
match = re.match(r'{}_(\d+).txt'.format(tag), filename)
if match:
highest_value = max(highest_value, int(match.group(1)))
data_path = os.path.join(self.mon_info.data_path,
'%s_%s.txt' % (tag, highest_value + 1))
# Resets the battery status right before the test starts.
self.dut.adb.shell(RESET_BATTERY_STATS)
self.log.info('Starting power measurement. Duration: {}s. Offset: '
'{}s. Voltage: {} V.'.format(self.mon_info.duration,
self.mon_info.offset,
self.mon_voltage))
# TODO(b/155426729): Create an accurate host-to-device time difference
# measurement.
device_time_cmd = 'echo $EPOCHREALTIME'
device_time = self.dut.adb.shell(device_time_cmd)
host_time = time.time()
self.log.debug('device start time %s, host start time %s', device_time,
host_time)
device_to_host_offset = float(device_time) - host_time
# Start the power measurement using monsoon.
self.dut.stop_services()
time.sleep(1)
self.power_monitor.disconnect_usb()
measurement_args = dict(duration=self.mon_info.duration,
measure_after_seconds=self.mon_info.offset,
hz=self.mon_info.freq)
self.power_monitor.measure(measurement_args=measurement_args,
measurement_name=self.test_name,
start_time=device_to_host_offset,
monsoon_output_path=data_path)
self.power_monitor.release_resources()
self.power_monitor.connect_usb()
self.dut.wait_for_boot_completion()
time.sleep(10)
self.dut.start_services()
return self.power_monitor.get_waveform(file_path=data_path)
def process_iperf_results(self):
"""Get the iperf results and process.
Returns:
throughput: the average throughput during tests.
"""
# Get IPERF results and add this to the plot title
RESULTS_DESTINATION = os.path.join(
self.iperf_server.log_path,
'iperf_client_output_{}.log'.format(self.current_test_name))
self.dut.pull_files(TEMP_FILE, RESULTS_DESTINATION)
# Calculate the average throughput
if self.use_client_output:
iperf_file = RESULTS_DESTINATION
else:
iperf_file = self.iperf_server.log_files[-1]
try:
iperf_result = ipf.IPerfResult(iperf_file)
# Compute the throughput in Mbit/s
throughput = (math.fsum(
iperf_result.instantaneous_rates[self.start_meas_time:-1]
) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1])
) * 8 * (1.024**2)
self.log.info('The average throughput is {}'.format(throughput))
except ValueError:
self.log.warning('Cannot get iperf result. Setting to 0')
throughput = 0
return throughput