blob: d07cb36689cb8c9dfeecd1195c1018ce0e082dee [file] [log] [blame]
#!/usr/bin/python
# Copyright (C) 2014 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os.path
import select
import sys
import time
import collections
import socket
import gflags as flags # http://code.google.com/p/python-gflags/
import pkgutil
import threading
import Queue
import traceback
import math
import bisect
from bisect import bisect_left
"""
scipy, numpy and matplotlib are python packages that can be installed
from: http://www.scipy.org/
"""
import scipy
import matplotlib.pyplot as plt
# let this script know about the power monitor implementations
sys.path = [os.path.basename(__file__)] + sys.path
available_monitors = [
name
for _, name, _ in pkgutil.iter_modules(
[os.path.join(os.path.dirname(__file__), "power_monitors")])
if not name.startswith("_")]
APK = os.path.join(os.path.dirname(__file__), "..", "CtsVerifier.apk")
FLAGS = flags.FLAGS
# DELAY_SCREEN_OFF is the number of seconds to wait for baseline state
DELAY_SCREEN_OFF = 20.0
# whether to log data collected to a file for each sensor run:
LOG_DATA_TO_FILE = True
logging.getLogger().setLevel(logging.ERROR)
def do_import(name):
"""import a module by name dynamically"""
mod = __import__(name)
components = name.split(".")
for comp in components[1:]:
mod = getattr(mod, comp)
return mod
class PowerTestException(Exception):
"""
Definition of specialized Exception class for CTS power tests
"""
def __init__(self, message):
self._error_message = message
def __str__(self):
return self._error_message
class PowerTest:
"""Class to run a suite of power tests. This has methods for obtaining
measurements from the power monitor (through the driver) and then
processing it to determine baseline and AP suspend state and
measure ampere draw of various sensors.
Ctrl+C causes a keyboard interrupt exception which terminates the test."""
# Thresholds for max allowed power usage per sensor tested
# TODO: Accel, Mag and Gyro have no maximum power specified in the CDD;
# the following numbers are bogus and will be replaced soon by what
# the device reports (from Sensor.getPower())
MAX_ACCEL_AMPS = 0.08 # Amps
MAX_MAG_AMPS = 0.08 # Amps
MAX_GYRO_AMPS = 0.08 # Amps
MAX_SIGMO_AMPS = 0.08 # Amps
# TODO: The following numbers for step counter, etc must be replaced by
# the numbers specified in CDD for low-power sensors. The expected current
# draw must be computed from the specified power and the voltage used to
# power the device (specified from a config file).
MAX_STEP_COUNTER_AMPS = 0.08 # Amps
MAX_STEP_DETECTOR_AMPS = 0.08 # Amps
# The variable EXPECTED_AMPS_VARIATION_HALF_RANGE denotes the expected
# variation of the ampere measurements
# around the mean value at baseline state. i.e. we expect most of the
# ampere measurements at baseline state to vary around the mean by
# between +/- of the number below
EXPECTED_AMPS_VARIATION_HALF_RANGE = 0.0005
# The variable THRESHOLD_BASELINE_SAMPLES_FRACTION denotes the minimum fraction of samples that must
# be in the range of variation defined by EXPECTED_AMPS_VARIATION_HALF_RANGE
# around the mean baseline for us to decide that the phone has settled into
# its baseline state
THRESHOLD_BASELINE_SAMPLES_FRACTION = 0.86
# The variable MAX_PERCENTILE_AP_SCREEN_OFF_AMPS denotes the maximum ampere
# draw that the device can consume when it has gone to suspend state with
# one or more sensors registered and batching samples (screen and AP are
# off in this case)
MAX_PERCENTILE_AP_SCREEN_OFF_AMPS = 0.030 # Amps
# The variable PERCENTILE_MAX_AP_SCREEN_OFF denotes the fraction of ampere
# measurements that must be below the specified maximum amperes
# MAX_PERCENTILE_AP_SCREEN_OFF_AMPS for us to decide that the phone has
# reached suspend state.
PERCENTILE_MAX_AP_SCREEN_OFF = 0.95
DOMAIN_NAME = "/android/cts/powertest"
# SAMPLE_COUNT_NOMINAL denotes the typical number of measurements of amperes
# to collect from the power monitor
SAMPLE_COUNT_NOMINAL = 1000
# RATE_NOMINAL denotes the nominal frequency at which ampere measurements
# are taken from the monsoon power monitor
RATE_NOMINAL = 100
ENABLE_PLOTTING = False
REQUEST_EXTERNAL_STORAGE = "EXTERNAL STORAGE?"
REQUEST_EXIT = "EXIT"
REQUEST_RAISE = "RAISE %s %s"
REQUEST_USER_RESPONSE = "USER RESPONSE %s"
REQUEST_SET_TEST_RESULT = "SET TEST RESULT %s %s %s"
REQUEST_SENSOR_SWITCH = "SENSOR %s %s"
REQUEST_SENSOR_AVAILABILITY = "SENSOR? %s"
REQUEST_SCREEN_OFF = "SCREEN OFF"
REQUEST_SHOW_MESSAGE = "MESSAGE %s"
NEGATIVE_AMPERE_ERROR_MESSAGE = (
"Negative ampere draw measured, possibly due to power "
"supply from USB cable. Check the setup of device and power "
"monitor to make sure that the device is not connected "
"to machine via USB directly. The device should be "
"connected to the USB slot in the power monitor. It is okay "
"to change the wiring when the test is in progress.")
def __init__(self, max_baseline_amps):
"""
Args:
max_baseline_amps: The maximum value of baseline amperes
that we expect the device to consume at baseline state.
This can be different between models of phones.
"""
power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor)
testid = time.strftime("%d_%m_%Y__%H__%M_%S")
self._power_monitor = power_monitors.Power_Monitor(log_file_id = testid)
self._tcp_connect_port = 0 # any available port
print ("Establishing connection to device...")
self.setUsbEnabled(True)
status = self._power_monitor.GetStatus()
self._native_hz = status["sampleRate"] * 1000
# the following describes power test being run (i.e on what sensor
# and what type of test. This is used for logging.
self._current_test = "None"
self._external_storage = self.executeOnDevice(PowerTest.REQUEST_EXTERNAL_STORAGE)
self._max_baseline_amps = max_baseline_amps
def __del__(self):
self.finalize()
def finalize(self):
"""To be called upon termination of host connection to device"""
if self._tcp_connect_port > 0:
# tell device side to exit connection loop, and remove the forwarding
# connection
self.executeOnDevice(PowerTest.REQUEST_EXIT, reportErrors = False)
self.executeLocal("adb forward --remove tcp:%d" % self._tcp_connect_port)
self._tcp_connect_port = 0
if self._power_monitor:
self._power_monitor.Close()
self._power_monitor = None
def _send(self, msg, report_errors = True):
"""Connect to the device, send the given command, and then disconnect"""
if self._tcp_connect_port == 0:
# on first attempt to send a command, connect to device via any open port number,
# forwarding that port to a local socket on the device via adb
logging.debug("Seeking port for communication...")
# discover an open port
dummysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dummysocket.bind(("localhost", 0))
(_, self._tcp_connect_port) = dummysocket.getsockname()
dummysocket.close()
assert(self._tcp_connect_port > 0)
status = self.executeLocal("adb forward tcp:%d localabstract:%s" %
(self._tcp_connect_port, PowerTest.DOMAIN_NAME))
# If the status !=0, then the host machine is unable to
# forward requests to client over adb. Ending the test and logging error message
# to the console on the host.
self.endTestIfLostConnection(
status != 0,
"Unable to forward requests to client over adb")
logging.info("Forwarding requests over local port %d",
self._tcp_connect_port)
link = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
logging.debug("Connecting to device...")
link.connect(("localhost", self._tcp_connect_port))
logging.debug("Connected.")
except socket.error as serr:
print "Socket connection error: ", serr
print "Finalizing and exiting the test"
self.endTestIfLostConnection(
report_errors,
"Unable to communicate with device: connection refused")
except:
print "Non socket-related exception at this block in _send(); re-raising now."
raise
logging.debug("Sending '%s'", msg)
link.sendall(msg)
logging.debug("Getting response...")
response = link.recv(4096)
logging.debug("Got response '%s'", response)
link.close()
return response
def queryDevice(self, query):
"""Post a yes/no query to the device, return True upon successful query, False otherwise"""
logging.info("Querying device with '%s'", query)
return self._send(query) == "OK"
# TODO: abstract device communication (and string commands) into its own class
def executeOnDevice(self, cmd, reportErrors = True):
"""Execute a (string) command on the remote device"""
return self._send(cmd, reportErrors)
def executeLocal(self, cmd, check_status = True):
"""execute a shell command locally (on the host)"""
from subprocess import call
status = call(cmd.split(" "))
if status != 0 and check_status:
logging.error("Failed to execute \"%s\"", cmd)
else:
logging.debug("Executed \"%s\"", cmd)
return status
def reportErrorRaiseExceptionIf(self, condition, msg):
"""Report an error condition to the device if condition is True.
Will raise an exception on the device if condition is True.
Args:
condition: If true, this reports error
msg: Message related to exception
Raises:
A PowerTestException encapsulating the message provided in msg
"""
if condition:
try:
logging.error("Exiting on error: %s" % msg)
self.executeOnDevice(PowerTest.REQUEST_RAISE % (self._current_test, msg),
reportErrors = True)
except:
logging.error("Unable to communicate with device to report "
"error: %s" % msg)
self.finalize()
sys.exit(msg)
raise PowerTestException(msg)
def endTestIfLostConnection(self, lost_connection, error_message):
"""
This function ends the test if lost_connection was true,
which indicates that the connection to the device was lost.
Args:
lost_connection: boolean variable, if True it indicates that
connection to device was lost and the test must be terminated.
error_message: String to print to the host console before exiting the test
(if lost_connection is True)
Returns:
None.
"""
if lost_connection:
logging.error(error_message)
self.finalize()
sys.exit(error_message)
def setUsbEnabled(self, enabled, verbose = True):
if enabled:
val = 1
else:
val = 0
self._power_monitor.SetUsbPassthrough(val)
tries = 0
# Sometimes command won't go through first time, particularly if immediately after a data
# collection, so allow for retries
# TODO: Move this retry mechanism to the power monitor driver.
status = self._power_monitor.GetStatus()
while status is None and tries < 5:
tries += 1
time.sleep(2.0)
logging.error("Retrying get status call...")
self._power_monitor.StopDataCollection()
self._power_monitor.SetUsbPassthrough(val)
status = self._power_monitor.GetStatus()
if enabled:
if verbose:
print("...USB enabled, waiting for device")
self.executeLocal("adb wait-for-device")
if verbose:
print("...device online")
else:
if verbose:
logging.info("...USB disabled")
# re-establish port forwarding
if enabled and self._tcp_connect_port > 0:
status = self.executeLocal("adb forward tcp:%d localabstract:%s" %
(self._tcp_connect_port, PowerTest.DOMAIN_NAME))
self.reportErrorRaiseExceptionIf(status != 0, msg = "Unable to forward requests to client over adb")
def computeBaselineState(self, measurements):
"""
Args:
measurements: List of floats containing ampere draw measurements
taken from the monsoon power monitor.
Must be atleast 100 measurements long
Returns:
A tuple (isBaseline, mean_current) where isBaseline is a
boolean that is True only if the baseline state for the phone is
detected. mean_current is an estimate of the average baseline
current for the device, which is valid only if baseline state is
detected (if not, it is set to -1).
"""
# Looks at the measurements to see if it is in baseline state
if len(measurements) < 100:
print(
"Need at least 100 measurements to determine if baseline state has"
" been reached")
return (False, -1)
# Assumption: At baseline state, the power profile is Gaussian distributed
# with low-variance around the mean current draw.
# Ideally we should find the mode from a histogram bin to find an estimated mean.
# Assuming here that the median is very close to this value; later we check that the
# variance of the samples is low enough to validate baseline.
sorted_measurements = sorted(measurements)
number_measurements = len(measurements)
if not number_measurements % 2:
median_measurement = (sorted_measurements[(number_measurements - 1) / 2] +
sorted_measurements[(number_measurements + 1) / 2]) / 2
else:
median_measurement = sorted_measurements[number_measurements / 2]
# Assume that at baseline state, a large fraction of power measurements
# are within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE milliAmperes of
# the average baseline current. Find all such measurements in the
# sorted measurement vector.
left_index = (
bisect_left(
sorted_measurements,
median_measurement -
PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE))
right_index = (
bisect_left(
sorted_measurements,
median_measurement +
PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE))
average_baseline_amps = scipy.mean(
sorted_measurements[left_index: (right_index - 1)])
detected_baseline = True
# We enforce that a fraction of more than 'THRESHOLD_BASELINE_SAMPLES_FRACTION'
# of samples must be within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE
# milliAmperes of the mean baseline current, which we have estimated as
# the median.
if ((right_index - left_index) < PowerTest.THRESHOLD_BASELINE_SAMPLES_FRACTION * len(
measurements)):
detected_baseline = False
# We check for the maximum limit of the expected baseline
if median_measurement > self._max_baseline_amps:
detected_baseline = False
if average_baseline_amps < 0:
print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE
detected_baseline = False
print("%s baseline state" % ("Could detect" if detected_baseline else "Could NOT detect"))
print(
"median amps = %f, avg amps = %f, fraction of good samples = %f" %
(median_measurement, average_baseline_amps,
float(right_index - left_index) / len(measurements)))
if PowerTest.ENABLE_PLOTTING:
plt.plot(measurements)
plt.show()
print("To continue test, please close the plot window manually.")
return (detected_baseline, average_baseline_amps)
def isApInSuspendState(self, measurements_amps, nominal_max_amps, test_percentile):
"""
This function detects AP suspend and display off state of phone
after a sensor has been registered.
Because the power profile can be very different between sensors and
even across builds, it is difficult to specify a tight threshold for
mean current draw or mandate that the power measurements must have low
variance. We use a criteria that allows for a certain fraction of
peaks in power spectrum and checks that test_percentile fraction of
measurements must be below the specified value nominal_max_amps
Args:
measurements_amps: amperes draw measurements from power monitor
test_percentile: the fraction of measurements we require to be below
a specified amps value
nominal_max_amps: the specified value of the max current draw
Returns:
returns a boolean which is True if and only if the AP suspend and
display off state is detected
"""
count_good = len([m for m in measurements_amps if m < nominal_max_amps])
count_negative = len([m for m in measurements_amps if m < 0])
if count_negative > 0:
print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE
return False;
return count_good > test_percentile * len(measurements_amps)
def getBaselineState(self):
"""This function first disables all sensors, then collects measurements
through the power monitor and continuously evaluates if baseline state
is reached. Once baseline state is detected, it returns a tuple with
status information. If baseline is not detected in a preset maximum
number of trials, it returns as well.
Returns:
Returns a tuple (isBaseline, mean_current) where isBaseline is a
boolean that is True only if the baseline state for the phone is
detected. mean_current is an estimate of the average baseline current
for the device, which is valid only if baseline state is detected
(if not, it is set to -1)
"""
self.setPowerOn("ALL", False)
self.setUsbEnabled(False)
print("Waiting %d seconds for baseline state" % DELAY_SCREEN_OFF)
time.sleep(DELAY_SCREEN_OFF)
MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION = 5 # seconds
NUMBER_MEASUREMENTS_BASELINE_DETECTION = (
PowerTest.RATE_NOMINAL *
MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION)
NUMBER_MEASUREMENTS_BASELINE_VERIFICATION = (
NUMBER_MEASUREMENTS_BASELINE_DETECTION * 5)
MAX_TRIALS = 50
collected_baseline_measurements = False
for tries in xrange(MAX_TRIALS):
print("Trial number %d of %d..." % (tries, MAX_TRIALS))
measurements = self.collectMeasurements(
NUMBER_MEASUREMENTS_BASELINE_DETECTION, PowerTest.RATE_NOMINAL,
verbose = False)
if self.computeBaselineState(measurements)[0] is True:
collected_baseline_measurements = True
break
if collected_baseline_measurements:
print("Verifying baseline state over a longer interval "
"in order to double check baseline state")
measurements = self.collectMeasurements(
NUMBER_MEASUREMENTS_BASELINE_VERIFICATION, PowerTest.RATE_NOMINAL,
verbose = False)
self.reportErrorRaiseExceptionIf(
not measurements, "No background measurements could be taken")
retval = self.computeBaselineState(measurements)
if retval[0]:
print("Verified baseline.")
if measurements and LOG_DATA_TO_FILE:
with open("/tmp/cts-power-tests-background-data.log", "w") as f:
for m in measurements:
f.write("%.4f\n" % m)
return retval
else:
return (False, -1)
def waitForApSuspendMode(self):
"""This function repeatedly collects measurements until AP suspend and display off
mode is detected. After a maximum number of trials, if this state is not reached, it
raises an error.
Returns:
boolean which is True if device was detected to be in suspend state
Raises:
Power monitor-related exception
"""
print("waitForApSuspendMode(): Sleeping for %d seconds" % DELAY_SCREEN_OFF)
time.sleep(DELAY_SCREEN_OFF)
NUMBER_MEASUREMENTS = 200
# Maximum trials for which to collect measurements to get to Ap suspend
# state
MAX_TRIALS = 50
got_to_suspend_state = False
for count in xrange(MAX_TRIALS):
print ("waitForApSuspendMode(): Trial %d of %d" % (count, MAX_TRIALS))
measurements = self.collectMeasurements(NUMBER_MEASUREMENTS,
PowerTest.RATE_NOMINAL,
verbose = False)
if self.isApInSuspendState(
measurements, PowerTest.MAX_PERCENTILE_AP_SCREEN_OFF_AMPS,
PowerTest.PERCENTILE_MAX_AP_SCREEN_OFF):
got_to_suspend_state = True
break
self.reportErrorRaiseExceptionIf(
got_to_suspend_state is False,
msg = "Unable to determine application processor suspend mode status.")
print("Got to AP suspend state")
return got_to_suspend_state
def collectMeasurements(self, measurementCount, rate, verbose = True):
"""Args:
measurementCount: Number of measurements to collect from the power
monitor
rate: The integer frequency in Hertz at which to collect measurements from
the power monitor
Returns:
A list containing measurements from the power monitor; that has the
requested count of the number of measurements at the specified rate
"""
assert (measurementCount > 0)
decimate_by = self._native_hz / rate or 1
self._power_monitor.StartDataCollection()
sub_measurements = []
measurements = []
tries = 0
if verbose: print("")
try:
while len(measurements) < measurementCount and tries < 5:
if tries:
self._power_monitor.StopDataCollection()
self._power_monitor.StartDataCollection()
time.sleep(1.0)
tries += 1
additional = self._power_monitor.CollectData()
if additional is not None:
tries = 0
sub_measurements.extend(additional)
while len(sub_measurements) >= decimate_by:
sub_avg = sum(sub_measurements[0:decimate_by]) / decimate_by
measurements.append(sub_avg)
sub_measurements = sub_measurements[decimate_by:]
if verbose:
# "\33[1A\33[2K" is a special Linux console control
# sequence for moving to the previous line, and
# erasing it; and reprinting new text on that
# erased line.
sys.stdout.write("\33[1A\33[2K")
print ("MEASURED[%d]: %f" % (len(measurements), measurements[-1]))
finally:
self._power_monitor.StopDataCollection()
self.reportErrorRaiseExceptionIf(measurementCount > len(measurements),
"Unable to collect all requested measurements")
return measurements
def requestUserAcknowledgment(self, msg):
"""Post message to user on screen and wait for acknowledgment"""
response = self.executeOnDevice(PowerTest.REQUEST_USER_RESPONSE % msg)
self.reportErrorRaiseExceptionIf(
response != "OK", "Unable to request user acknowledgment")
def setTestResult(self, test_name, test_result, test_message):
"""
Reports the result of a test to the device
Args:
test_name: name of the test
test_result: Boolean result of the test (True means Pass)
test_message: Relevant message
"""
print ("Test %s : %s" % (test_name, test_result))
response = (
self.executeOnDevice(
PowerTest.REQUEST_SET_TEST_RESULT %
(test_name, test_result, test_message)))
self.reportErrorRaiseExceptionIf(
response != "OK", "Unable to send test status to Verifier")
def setPowerOn(self, sensor, powered_on):
response = self.executeOnDevice(PowerTest.REQUEST_SENSOR_SWITCH %
(("ON" if powered_on else "OFF"), sensor))
self.reportErrorRaiseExceptionIf(
response == "ERR", "Unable to set sensor %s state" % sensor)
logging.info("Set %s %s", sensor, ("ON" if powered_on else "OFF"))
return response
def runSensorPowerTest(
self, sensor, max_amperes_allowed, baseline_amps, user_request = None):
"""
Runs power test for a specific sensor; i.e. measures the amperes draw
of the phone using monsoon, with the specified sensor mregistered
and the phone in suspend state; and verifies that the incremental
consumed amperes is within expected bounds.
Args:
sensor: The specified sensor for which to run the power test
max_amperes_allowed: Maximum ampere draw of the device with the
sensor registered and device in suspend state
baseline_amps: The power draw of the device when it is in baseline
state (no sensors registered, display off, AP asleep)
"""
self._current_test = ("%s_Power_Test_While_%s" % (
sensor, ("Under_Motion" if user_request is not None else "Still")))
try:
print ("\n\n---------------------------------")
if user_request is not None:
print ("Running power test on %s under motion." % sensor)
else:
print ("Running power test on %s while device is still." % sensor)
print ("---------------------------------")
response = self.executeOnDevice(
PowerTest.REQUEST_SENSOR_AVAILABILITY % sensor)
if response == "UNAVAILABLE":
self.setTestResult(
self._current_test, test_result = "SKIPPED",
test_message = "Sensor %s not available on this platform" % sensor)
self.setPowerOn("ALL", False)
if response == "UNAVAILABLE":
self.setTestResult(
self._current_test, test_result = "SKIPPED",
test_message = "Sensor %s not available on this device" % sensor)
return
self.reportErrorRaiseExceptionIf(response != "OK", "Unable to set all sensor off")
self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF)
self.setUsbEnabled(False)
self.setUsbEnabled(True)
self.setPowerOn(sensor, True)
if user_request is not None:
print("===========================================\n" +
"==> Please follow the instructions presented on the device\n" +
"===========================================")
self.requestUserAcknowledgment(user_request)
self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF)
self.setUsbEnabled(False)
self.reportErrorRaiseExceptionIf(
response != "OK", "Unable to set sensor %s ON" % sensor)
self.waitForApSuspendMode()
print ("Collecting sensor %s measurements" % sensor)
measurements = self.collectMeasurements(PowerTest.SAMPLE_COUNT_NOMINAL,
PowerTest.RATE_NOMINAL)
if measurements and LOG_DATA_TO_FILE:
with open("/tmp/cts-power-tests-%s-%s-sensor-data.log" % (sensor,
("Under_Motion" if user_request is not None else "Still")), "w") as f:
for m in measurements:
f.write("%.4f\n" % m)
self.setUsbEnabled(True, verbose = False)
print("Saving raw data files to device...")
self.executeLocal("adb shell mkdir -p %s" % self._external_storage, False)
self.executeLocal("adb push %s %s/." % (f.name, self._external_storage))
self.setUsbEnabled(False, verbose = False)
self.reportErrorRaiseExceptionIf(
not measurements, "No measurements could be taken for %s" % sensor)
avg = sum(measurements) / len(measurements)
squared = [(m - avg) * (m - avg) for m in measurements]
stddev = math.sqrt(sum(squared) / len(squared))
current_diff = avg - baseline_amps
self.setUsbEnabled(True)
max_power = max(measurements) - avg
if current_diff <= max_amperes_allowed:
# TODO: fail the test of background > current
message = (
"Draw is within limits. Sensor delta:%f mAmp Baseline:%f "
"mAmp Sensor: %f mAmp Stddev : %f mAmp Peak: %f mAmp") % (
current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0,
stddev * 1000.0, max_power * 1000.0)
else:
message = (
"Draw is too high. Current:%f Background:%f Measured: %f "
"Stddev: %f Peak: %f") % (
current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0,
stddev * 1000.0, max_power * 1000.0)
self.setTestResult(
self._current_test,
("PASS" if (current_diff <= max_amperes_allowed) else "FAIL"),
message)
print("Result: " + message)
except:
traceback.print_exc()
self.setTestResult(self._current_test, test_result = "FAIL",
test_message = "Exception occurred during run of test.")
raise
@staticmethod
def runTests(max_baseline_amps):
testrunner = None
try:
GENERIC_MOTION_REQUEST = ("\n===> Please press Next and when the "
"screen is off, keep the device under motion with only tiny, "
"slow movements until the screen turns on again.\nPlease "
"refrain from interacting with the screen or pressing any side "
"buttons while measurements are taken.")
USER_STEPS_REQUEST = ("\n===> Please press Next and when the "
"screen is off, then move the device to simulate step motion "
"until the screen turns on again.\nPlease refrain from "
"interacting with the screen or pressing any side buttons "
"while measurements are taken.")
testrunner = PowerTest(max_baseline_amps)
testrunner.executeOnDevice(
PowerTest.REQUEST_SHOW_MESSAGE % "Connected. Running tests...")
is_baseline_success, baseline_amps = testrunner.getBaselineState()
if is_baseline_success:
testrunner.setUsbEnabled(True)
# TODO: Enable testing a single sensor
testrunner.runSensorPowerTest(
"SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps,
user_request = GENERIC_MOTION_REQUEST)
testrunner.runSensorPowerTest(
"STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps,
user_request = USER_STEPS_REQUEST)
testrunner.runSensorPowerTest(
"STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps,
user_request = USER_STEPS_REQUEST)
testrunner.runSensorPowerTest(
"ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps,
user_request = GENERIC_MOTION_REQUEST)
testrunner.runSensorPowerTest(
"MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps,
user_request = GENERIC_MOTION_REQUEST)
testrunner.runSensorPowerTest(
"GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps,
user_request = GENERIC_MOTION_REQUEST)
testrunner.runSensorPowerTest(
"ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps,
user_request = None)
testrunner.runSensorPowerTest(
"MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps,
user_request = None)
testrunner.runSensorPowerTest(
"GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps,
user_request = None)
testrunner.runSensorPowerTest(
"SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps,
user_request = None)
testrunner.runSensorPowerTest(
"STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps,
user_request = None)
testrunner.runSensorPowerTest(
"STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps,
user_request = None)
else:
print("Could not get to baseline state. This is either because "
"in several trials, the monitor could not measure a set "
"of power measurements that had the specified low "
"variance or the mean measurements were below the "
"expected value. None of the sensor power measurement "
" tests were performed due to not being able to detect "
"baseline state. Please re-run the power tests.")
except KeyboardInterrupt:
print "Keyboard interrupt from user."
raise
except:
import traceback
traceback.print_exc()
finally:
logging.info("TESTS COMPLETE")
if testrunner:
try:
testrunner.finalize()
except socket.error:
sys.exit(
"===================================================\n"
"Unable to connect to device under test. Make sure \n"
"the device is connected via the usb pass-through, \n"
"the CtsVerifier app is running the SensorPowerTest on \n"
"the device, and USB pass-through is enabled.\n"
"===================================================")
def main(argv):
""" Simple command-line interface for a power test application."""
useful_flags = ["voltage", "status", "usbpassthrough",
"samples", "current", "log", "power_monitor"]
if not [f for f in useful_flags if FLAGS.get(f, None) is not None]:
print __doc__.strip()
print FLAGS.MainModuleHelp()
return
if FLAGS.avg and FLAGS.avg < 0:
logging.error("--avg must be greater than 0")
return
if FLAGS.voltage is not None:
if FLAGS.voltage > 5.5:
print("!!WARNING: Voltage higher than typical values!!!")
try:
response = raw_input(
"Voltage of %.3f requested. Confirm this is correct (Y/N)" %
FLAGS.voltage)
if response.upper() != "Y":
sys.exit("Aborting")
except:
sys.exit("Aborting.")
if not FLAGS.power_monitor:
sys.exit(
"You must specify a '--power_monitor' option to specify which power "
"monitor type " +
"you are using.\nOne of:\n \n ".join(available_monitors))
power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor)
try:
mon = power_monitors.Power_Monitor(device = FLAGS.device)
except:
import traceback
traceback.print_exc()
sys.exit("No power monitors found")
if FLAGS.voltage is not None:
if FLAGS.ramp is not None:
mon.RampVoltage(mon.start_voltage, FLAGS.voltage)
else:
mon.SetVoltage(FLAGS.voltage)
if FLAGS.current is not None:
mon.SetMaxCurrent(FLAGS.current)
if FLAGS.status:
items = sorted(mon.GetStatus().items())
print "\n".join(["%s: %s" % item for item in items])
if FLAGS.usbpassthrough:
if FLAGS.usbpassthrough == "off":
mon.SetUsbPassthrough(0)
elif FLAGS.usbpassthrough == "on":
mon.SetUsbPassthrough(1)
elif FLAGS.usbpassthrough == "auto":
mon.SetUsbPassthrough(2)
else:
mon.Close()
sys.exit("bad pass-through flag: %s" % FLAGS.usbpassthrough)
if FLAGS.samples:
# Make sure state is normal
mon.StopDataCollection()
status = mon.GetStatus()
native_hz = status["sampleRate"] * 1000
# Collect and average samples as specified
mon.StartDataCollection()
# In case FLAGS.hz doesn't divide native_hz exactly, use this invariant:
# 'offset' = (consumed samples) * FLAGS.hz - (emitted samples) * native_hz
# This is the error accumulator in a variation of Bresenham's algorithm.
emitted = offset = 0
collected = []
history_deque = collections.deque() # past n samples for rolling average
# TODO: Complicated lines of code below. Refactoring needed
try:
last_flush = time.time()
while emitted < FLAGS.samples or FLAGS.samples == -1:
# The number of raw samples to consume before emitting the next output
need = (native_hz - offset + FLAGS.hz - 1) / FLAGS.hz
if need > len(collected): # still need more input samples
samples = mon.CollectData()
if not samples: break
collected.extend(samples)
else:
# Have enough data, generate output samples.
# Adjust for consuming 'need' input samples.
offset += need * FLAGS.hz
while offset >= native_hz: # maybe multiple, if FLAGS.hz > native_hz
this_sample = sum(collected[:need]) / need
if FLAGS.timestamp: print int(time.time()),
if FLAGS.avg:
history_deque.appendleft(this_sample)
if len(history_deque) > FLAGS.avg: history_deque.pop()
print "%f %f" % (this_sample,
sum(history_deque) / len(history_deque))
else:
print "%f" % this_sample
sys.stdout.flush()
offset -= native_hz
emitted += 1 # adjust for emitting 1 output sample
collected = collected[need:]
now = time.time()
if now - last_flush >= 0.99: # flush every second
sys.stdout.flush()
last_flush = now
except KeyboardInterrupt:
print("interrupted")
return 1
finally:
mon.Close()
return 0
if FLAGS.run:
if not FLAGS.power_monitor:
sys.exit(
"When running power tests, you must specify which type of power "
"monitor to use" +
" with '--power_monitor <type of power monitor>'")
try:
PowerTest.runTests(FLAGS.max_baseline_amps)
except KeyboardInterrupt:
print "Keyboard interrupt from user"
if __name__ == "__main__":
flags.DEFINE_boolean("status", None, "Print power meter status")
flags.DEFINE_integer("avg", None,
"Also report average over last n data points")
flags.DEFINE_float("voltage", None, "Set output voltage (0 for off)")
flags.DEFINE_float("current", None, "Set max output current")
flags.DEFINE_string("usbpassthrough", None, "USB control (on, off, auto)")
flags.DEFINE_integer("samples", None, "Collect and print this many samples")
flags.DEFINE_integer("hz", 5000, "Print this many samples/sec")
flags.DEFINE_string("device", None,
"Path to the device in /dev/... (ex:/dev/ttyACM1)")
flags.DEFINE_boolean("timestamp", None,
"Also print integer (seconds) timestamp on each line")
flags.DEFINE_boolean("ramp", True, "Gradually increase voltage")
flags.DEFINE_boolean("log", False, "Log progress to a file or not")
flags.DEFINE_boolean("run", False, "Run the test suite for power")
flags.DEFINE_string("power_monitor", None, "Type of power monitor to use")
flags.DEFINE_float("max_baseline_amps", 0.005,
"Set maximum baseline current for device being tested")
sys.exit(main(FLAGS(sys.argv)))