blob: 810ed1f9038f361985f399c95297a7f7b3efbe71 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2020 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import tempfile
from acts.controllers import power_metrics
from acts.controllers.monsoon_lib.api.common import MonsoonError
class ResourcesRegistryError(Exception):
pass
_REGISTRY = {}
def update_registry(registry):
"""Updates the registry with the one passed.
Overriding a previous value is not allowed.
Args:
registry: A dictionary.
Raises:
ResourceRegistryError if a property is updated with a different value.
"""
for k, v in registry.items():
if k in _REGISTRY:
if v == _REGISTRY[k]:
continue
raise ResourcesRegistryError(
'Overwriting resources_registry fields is not allowed. %s was '
'already defined as %s and was attempted to be overwritten '
'with %s.' % (k, _REGISTRY[k], v))
_REGISTRY[k] = v
def get_registry():
return _REGISTRY
def _write_raw_data_in_standard_format(raw_data, path, start_time):
"""Writes the raw data to a file in (seconds since epoch, amps).
TODO(b/155294049): Deprecate this once Monsoon controller output
format is updated.
Args:
start_time: Measurement start time in seconds since epoch
raw_data: raw data as list or generator of (timestamp, sample)
path: path to write output
"""
with open(path, 'w') as f:
for timestamp, amps in raw_data:
f.write('%s %s\n' %
(timestamp + start_time, amps))
class BasePowerMonitor(object):
def setup(self, **kwargs):
raise NotImplementedError()
def connect_usb(self, **kwargs):
raise NotImplementedError()
def measure(self, **kwargs):
raise NotImplementedError()
def release_resources(self, **kwargs):
raise NotImplementedError()
def disconnect_usb(self, **kwargs):
raise NotImplementedError()
def get_metrics(self, **kwargs):
raise NotImplementedError()
def get_waveform(self, **kwargs):
raise NotImplementedError()
def teardown(self, **kwargs):
raise NotImplementedError()
class PowerMonitorMonsoonFacade(BasePowerMonitor):
def __init__(self, monsoon):
"""Constructs a PowerMonitorFacade.
Args:
monsoon: delegate monsoon object, either
acts.controllers.monsoon_lib.api.hvpm.monsoon.Monsoon or
acts.controllers.monsoon_lib.api.lvpm_stock.monsoon.Monsoon.
"""
self.monsoon = monsoon
self._log = logging.getLogger()
def setup(self, monsoon_config=None, **__):
"""Set up the Monsoon controller for this testclass/testcase."""
if monsoon_config is None:
raise MonsoonError('monsoon_config can not be None')
self._log.info('Setting up Monsoon %s' % self.monsoon.serial)
voltage = monsoon_config.get_numeric('voltage', 4.2)
self.monsoon.set_voltage_safe(voltage)
if 'max_current' in monsoon_config:
self.monsoon.set_max_current(
monsoon_config.get_numeric('max_current'))
def power_cycle(self, monsoon_config=None, **__):
"""Power cycles the delegated monsoon controller."""
if monsoon_config is None:
raise MonsoonError('monsoon_config can not be None')
self._log.info('Setting up Monsoon %s' % self.monsoon.serial)
voltage = monsoon_config.get_numeric('voltage', 4.2)
self._log.info('Setting up Monsoon voltage %s' % voltage)
self.monsoon.set_voltage_safe(0)
if 'max_current' in monsoon_config:
self.monsoon.set_max_current(
monsoon_config.get_numeric('max_current'))
self.monsoon.set_max_initial_current(
monsoon_config.get_numeric('max_current'))
self.connect_usb()
self.monsoon.set_voltage_safe(voltage)
def connect_usb(self, **__):
self.monsoon.usb('on')
def measure(self, measurement_args=None, start_time=None,
monsoon_output_path=None, **__):
if measurement_args is None:
raise MonsoonError('measurement_args can not be None')
with tempfile.NamedTemporaryFile(prefix='monsoon_') as tmon:
self.monsoon.measure_power(**measurement_args,
output_path=tmon.name)
if monsoon_output_path and start_time is not None:
_write_raw_data_in_standard_format(
power_metrics.import_raw_data(tmon.name),
monsoon_output_path, start_time)
def release_resources(self, **__):
# nothing to do
pass
def disconnect_usb(self, **__):
self.monsoon.usb('off')
def get_waveform(self, file_path=None):
"""Parses a file to obtain all current (in amps) samples.
Args:
file_path: Path to a monsoon file.
Returns:
A list of tuples in which the first element is a timestamp and the
second element is the sampled current at that time.
"""
if file_path is None:
raise MonsoonError('file_path can not be None')
return list(power_metrics.import_raw_data(file_path))
def get_metrics(self, start_time=None, voltage=None, monsoon_file_path=None,
timestamps=None, **__):
"""Parses a monsoon_file_path to compute the consumed power and other
power related metrics.
Args:
start_time: Time when the measurement started, this is used to
correlate timestamps from the device and from the power samples.
voltage: Voltage used when the measurement started. Used to compute
power from current.
monsoon_file_path: Path to a monsoon file.
timestamps: Named timestamps delimiting the segments of interest.
**__:
Returns:
A list of power_metrics.Metric.
"""
if start_time is None:
raise MonsoonError('start_time can not be None')
if voltage is None:
raise MonsoonError('voltage can not be None')
if monsoon_file_path is None:
raise MonsoonError('monsoon_file_path can not be None')
if timestamps is None:
raise MonsoonError('timestamps can not be None')
return power_metrics.generate_test_metrics(
power_metrics.import_raw_data(monsoon_file_path),
timestamps=timestamps, voltage=voltage)
def teardown(self, **__):
# nothing to do
pass