blob: 064310f97ea5ce3bdf9f29ed15aaf4748ba3996c [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import time
from Monsoon import HVPM
from Monsoon import Operations as op
from acts.controllers.monsoon_lib.api.common import MonsoonResult
from acts.controllers.monsoon_lib.api.monsoon import BaseMonsoon
from acts.controllers.monsoon_lib.sampling.engine.assembly_line import AssemblyLineBuilder
from acts.controllers.monsoon_lib.sampling.engine.assembly_line import ThreadAssemblyLine
from acts.controllers.monsoon_lib.sampling.engine.transformers import DownSampler
from acts.controllers.monsoon_lib.sampling.engine.transformers import SampleAggregator
from acts.controllers.monsoon_lib.sampling.engine.transformers import Tee
from acts.controllers.monsoon_lib.sampling.hvpm.transformers import HvpmTransformer
class Monsoon(BaseMonsoon):
"""The controller class for interacting with the HVPM Monsoon."""
# The device doesn't officially support voltages lower than this. Note that
# 0 is a valid voltage.
MIN_VOLTAGE = 0.8
# The Monsoon doesn't support setting higher voltages than this directly
# without tripping overvoltage.
# Note that it is possible to increase the voltage above this value by
# increasing the voltage by small increments over a period of time.
# The communication protocol supports up to 16V.
MAX_VOLTAGE = 13.5
def __init__(self, serial):
super().__init__()
self.serial = serial
self._mon = HVPM.Monsoon()
self._mon.setup_usb(serial)
self._allocated = True
if self._mon.Protocol.DEVICE is None:
raise ValueError('HVPM Monsoon %s could not be found.' % serial)
def set_voltage(self, voltage):
"""Sets the output voltage of monsoon.
Args:
voltage: The voltage to set the output to.
"""
self._log.debug('Setting voltage to %sV.' % voltage)
self._mon.setVout(voltage)
def set_max_current(self, amperes):
"""Sets monsoon's max output current.
Args:
amperes: The max current in A.
"""
self._mon.setRunTimeCurrentLimit(amperes)
def set_max_initial_current(self, amperes):
"""Sets the max power-up/initial current.
Args:
amperes: The max initial current allowed in amperes.
"""
self._mon.setPowerUpCurrentLimit(amperes)
@property
def status(self):
"""Gets the status params of monsoon.
Returns:
A dictionary of {status param, value} key-value pairs.
"""
self._mon.fillStatusPacket()
return self._mon.statusPacket
def _set_usb_passthrough_mode(self, mode):
"""Sends the call to set usb passthrough mode.
Args:
mode: The state to set the USB passthrough to. Can either be the
string name of the state or the integer value.
"Off" or 0 means USB always off.
"On" or 1 means USB always on.
"Auto" or 2 means USB is automatically turned off during
sampling, and turned back on after sampling.
"""
self._mon.setUSBPassthroughMode(mode)
def _get_main_voltage(self):
"""Returns the value of the voltage on the main channel."""
# Any getValue call on a setX function will return the value set for X.
# Using this, we can pull the last setMainVoltage (or its default).
return (self._mon.Protocol.getValue(op.OpCodes.setMainVoltage, 4) /
op.Conversion.FLOAT_TO_INT)
def measure_power(self,
duration,
measure_after_seconds=0,
hz=5000,
output_path=None,
transformers=None):
"""See parent docstring for details."""
voltage = self._get_main_voltage()
aggregator = SampleAggregator(measure_after_seconds)
manager = multiprocessing.Manager()
assembly_line_builder = AssemblyLineBuilder(manager.Queue,
ThreadAssemblyLine)
assembly_line_builder.source(
HvpmTransformer(self.serial, duration + measure_after_seconds))
if hz != 5000:
assembly_line_builder.into(DownSampler(int(5000 / hz)))
if output_path:
assembly_line_builder.into(Tee(output_path, measure_after_seconds))
assembly_line_builder.into(aggregator)
if transformers:
for transformer in transformers:
assembly_line_builder.into(transformer)
self.take_samples(assembly_line_builder.build())
manager.shutdown()
self._mon.setup_usb(self.serial)
self._allocated = True
monsoon_data = MonsoonResult(aggregator.num_samples,
aggregator.sum_currents, hz, voltage,
output_path)
self._log.info('Measurement summary:\n%s', str(monsoon_data))
return monsoon_data
def reconnect_monsoon(self):
"""Reconnect Monsoon to serial port."""
self.release_monsoon_connection()
self._log.info('Closed monsoon connection.')
time.sleep(5)
self.establish_monsoon_connection()
def release_monsoon_connection(self):
self._mon.closeDevice()
self._allocated = False
def is_allocated(self):
return self._allocated
def establish_monsoon_connection(self):
self._mon.setup_usb(self.serial)
# Makes sure the Monsoon is in the command-receiving state.
self._mon.stopSampling()