| #!/usr/bin/env python3 |
| # |
| # Copyright 2020 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import tempfile |
| |
| from acts.controllers import power_metrics |
| from acts.controllers.monsoon_lib.api.common import MonsoonError |
| |
| |
| class ResourcesRegistryError(Exception): |
| pass |
| |
| |
| _REGISTRY = {} |
| |
| |
| def update_registry(registry): |
| """Updates the registry with the one passed. |
| |
| Overriding a previous value is not allowed. |
| |
| Args: |
| registry: A dictionary. |
| Raises: |
| ResourceRegistryError if a property is updated with a different value. |
| """ |
| for k, v in registry.items(): |
| if k in _REGISTRY: |
| if v == _REGISTRY[k]: |
| continue |
| raise ResourcesRegistryError( |
| 'Overwriting resources_registry fields is not allowed. %s was ' |
| 'already defined as %s and was attempted to be overwritten ' |
| 'with %s.' % (k, _REGISTRY[k], v)) |
| _REGISTRY[k] = v |
| |
| |
| def get_registry(): |
| return _REGISTRY |
| |
| |
| def _write_raw_data_in_standard_format(raw_data, path, start_time): |
| """Writes the raw data to a file in (seconds since epoch, amps). |
| |
| TODO(b/155294049): Deprecate this once Monsoon controller output |
| format is updated. |
| |
| Args: |
| start_time: Measurement start time in seconds since epoch |
| raw_data: raw data as list or generator of (timestamp, sample) |
| path: path to write output |
| """ |
| with open(path, 'w') as f: |
| for timestamp, amps in raw_data: |
| f.write('%s %s\n' % |
| (timestamp + start_time, amps)) |
| |
| |
| class BasePowerMonitor(object): |
| |
| def setup(self, **kwargs): |
| raise NotImplementedError() |
| |
| def connect_usb(self, **kwargs): |
| raise NotImplementedError() |
| |
| def measure(self, **kwargs): |
| raise NotImplementedError() |
| |
| def release_resources(self, **kwargs): |
| raise NotImplementedError() |
| |
| def disconnect_usb(self, **kwargs): |
| raise NotImplementedError() |
| |
| def get_metrics(self, **kwargs): |
| raise NotImplementedError() |
| |
| def get_waveform(self, **kwargs): |
| raise NotImplementedError() |
| |
| def teardown(self, **kwargs): |
| raise NotImplementedError() |
| |
| |
| class PowerMonitorMonsoonFacade(BasePowerMonitor): |
| |
| def __init__(self, monsoon): |
| """Constructs a PowerMonitorFacade. |
| |
| Args: |
| monsoon: delegate monsoon object, either |
| acts.controllers.monsoon_lib.api.hvpm.monsoon.Monsoon or |
| acts.controllers.monsoon_lib.api.lvpm_stock.monsoon.Monsoon. |
| """ |
| self.monsoon = monsoon |
| self._log = logging.getLogger() |
| |
| def setup(self, monsoon_config=None, **__): |
| """Set up the Monsoon controller for this testclass/testcase.""" |
| |
| if monsoon_config is None: |
| raise MonsoonError('monsoon_config can not be None') |
| |
| self._log.info('Setting up Monsoon %s' % self.monsoon.serial) |
| voltage = monsoon_config.get_numeric('voltage', 4.2) |
| self.monsoon.set_voltage_safe(voltage) |
| if 'max_current' in monsoon_config: |
| self.monsoon.set_max_current( |
| monsoon_config.get_numeric('max_current')) |
| |
| def power_cycle(self, monsoon_config=None, **__): |
| """Power cycles the delegated monsoon controller.""" |
| |
| if monsoon_config is None: |
| raise MonsoonError('monsoon_config can not be None') |
| |
| self._log.info('Setting up Monsoon %s' % self.monsoon.serial) |
| voltage = monsoon_config.get_numeric('voltage', 4.2) |
| self._log.info('Setting up Monsoon voltage %s' % voltage) |
| self.monsoon.set_voltage_safe(0) |
| if 'max_current' in monsoon_config: |
| self.monsoon.set_max_current( |
| monsoon_config.get_numeric('max_current')) |
| self.monsoon.set_max_initial_current( |
| monsoon_config.get_numeric('max_current')) |
| self.connect_usb() |
| self.monsoon.set_voltage_safe(voltage) |
| |
| def connect_usb(self, **__): |
| self.monsoon.usb('on') |
| |
| def measure(self, measurement_args=None, start_time=None, |
| monsoon_output_path=None, **__): |
| if measurement_args is None: |
| raise MonsoonError('measurement_args can not be None') |
| |
| with tempfile.NamedTemporaryFile(prefix='monsoon_') as tmon: |
| self.monsoon.measure_power(**measurement_args, |
| output_path=tmon.name) |
| |
| if monsoon_output_path and start_time is not None: |
| _write_raw_data_in_standard_format( |
| power_metrics.import_raw_data(tmon.name), |
| monsoon_output_path, start_time) |
| |
| def release_resources(self, **__): |
| # nothing to do |
| pass |
| |
| def disconnect_usb(self, **__): |
| self.monsoon.usb('off') |
| |
| def get_waveform(self, file_path=None): |
| """Parses a file to obtain all current (in amps) samples. |
| |
| Args: |
| file_path: Path to a monsoon file. |
| |
| Returns: |
| A list of tuples in which the first element is a timestamp and the |
| second element is the sampled current at that time. |
| """ |
| if file_path is None: |
| raise MonsoonError('file_path can not be None') |
| |
| return list(power_metrics.import_raw_data(file_path)) |
| |
| def get_metrics(self, start_time=None, voltage=None, monsoon_file_path=None, |
| timestamps=None, **__): |
| """Parses a monsoon_file_path to compute the consumed power and other |
| power related metrics. |
| |
| Args: |
| start_time: Time when the measurement started, this is used to |
| correlate timestamps from the device and from the power samples. |
| voltage: Voltage used when the measurement started. Used to compute |
| power from current. |
| monsoon_file_path: Path to a monsoon file. |
| timestamps: Named timestamps delimiting the segments of interest. |
| **__: |
| |
| Returns: |
| A list of power_metrics.Metric. |
| """ |
| if start_time is None: |
| raise MonsoonError('start_time can not be None') |
| if voltage is None: |
| raise MonsoonError('voltage can not be None') |
| if monsoon_file_path is None: |
| raise MonsoonError('monsoon_file_path can not be None') |
| if timestamps is None: |
| raise MonsoonError('timestamps can not be None') |
| |
| return power_metrics.generate_test_metrics( |
| power_metrics.import_raw_data(monsoon_file_path), |
| timestamps=timestamps, voltage=voltage) |
| |
| def teardown(self, **__): |
| # nothing to do |
| pass |