blob: becc4ee99cb142242f68383461748a255fac5084 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import struct
import time
import numpy as np
from acts.controllers.monsoon_lib.api.lvpm_stock.monsoon_proxy import MonsoonProxy
from acts.controllers.monsoon_lib.sampling.common import UncalibratedSampleChunk
from acts.controllers.monsoon_lib.sampling.engine.assembly_line import BufferList
from acts.controllers.monsoon_lib.sampling.engine.assembly_line import ProcessAssemblyLineBuilder
from acts.controllers.monsoon_lib.sampling.engine.assembly_line import ThreadAssemblyLineBuilder
from acts.controllers.monsoon_lib.sampling.engine.calibration import CalibrationError
from acts.controllers.monsoon_lib.sampling.engine.transformer import ParallelTransformer
from acts.controllers.monsoon_lib.sampling.engine.transformer import SequentialTransformer
from acts.controllers.monsoon_lib.sampling.engine.transformer import SourceTransformer
from acts.controllers.monsoon_lib.sampling.engine.transformer import Transformer
from acts.controllers.monsoon_lib.sampling.enums import Channel
from acts.controllers.monsoon_lib.sampling.enums import Granularity
from acts.controllers.monsoon_lib.sampling.enums import Origin
from acts.controllers.monsoon_lib.sampling.lvpm_stock.calibrations import LvpmCalibrationData
from acts.controllers.monsoon_lib.sampling.lvpm_stock.calibrations import LvpmCalibrationSnapshot
from acts.controllers.monsoon_lib.sampling.lvpm_stock.packet import Packet
from acts.controllers.monsoon_lib.sampling.lvpm_stock.packet import SampleType
class StockLvpmSampler(Transformer):
"""Gathers samples from the Monsoon and brings them back to the caller."""
def __init__(self, monsoon_serial, duration):
super().__init__()
self.monsoon_serial = monsoon_serial
self.duration = duration
def _transform(self, input_stream):
# yapf: disable. Yapf doesn't handle fluent interfaces well.
(ProcessAssemblyLineBuilder()
.source(PacketCollector(self.monsoon_serial, self.duration))
.into(SampleNormalizer())
.build(output_stream=self.output_stream)
.run())
# yapf: enable
class PacketCollector(SourceTransformer):
"""Collects Monsoon packets into a buffer to be sent to another process."""
def __init__(self, serial=None, sampling_duration=None):
super().__init__()
self._monsoon_serial = serial
self._monsoon_proxy = None
self.start_time = 0
self.sampling_duration = sampling_duration
def _initialize_monsoon(self):
"""Initializes the MonsoonProxy object."""
self._monsoon_proxy = MonsoonProxy(serialno=self._monsoon_serial)
def on_begin(self):
"""Begins data collection."""
self.start_time = time.time()
self._initialize_monsoon()
self._monsoon_proxy.start_data_collection()
def on_end(self):
"""Stops data collection."""
self._monsoon_proxy.stop_data_collection()
self._monsoon_proxy.ser.close()
def _transform_buffer(self, buffer):
"""Fills the given buffer with raw monsoon data at each entry."""
if (self.sampling_duration
and self.sampling_duration < time.time() - self.start_time):
return None
for index in range(len(buffer)):
time_before_read = time.time()
data = self._read_packet()
if data is None:
continue
time_after_read = time.time()
time_data = struct.pack('dd', time_after_read - self.start_time,
time_after_read - time_before_read)
buffer[index] = time_data + data
return buffer
def _read_packet(self):
"""Reads a single packet from the serial port.
Packets are sent as Length-Value-Checksum, where the first byte is the
length, the following bytes are the value and checksum. The checksum is
the stored in the final byte, and is calculated as the 16 least-
significant-bits of the sum of all value bytes.
Returns:
None if the read failed. Otherwise, the packet data received.
"""
len_char = self._monsoon_proxy.ser.read(1)
if not len_char:
logging.warning('Reading from serial timed out.')
return None
data_len = ord(len_char)
if not data_len:
logging.warning('Unable to read packet length.')
return None
result = self._monsoon_proxy.ser.read(int(data_len))
result = bytearray(result)
if len(result) != data_len:
logging.warning(
'Length mismatch, expected %d bytes, got %d bytes.', data_len,
len(result))
return None
body = result[:-1]
checksum = sum(body, data_len) & 0xFF
if result[-1] != checksum:
logging.warning(
'Invalid checksum from serial port! Expected %s, '
'got %s', hex(checksum), hex(result[-1]))
return None
return body
class SampleNormalizer(Transformer):
"""Normalizes the raw packet data into reading values."""
def _transform(self, input_stream):
# yapf: disable. Yapf doesn't handle fluent interfaces well.
(ThreadAssemblyLineBuilder()
.source(PacketReader(), input_stream=input_stream)
.into(SampleChunker())
.into(CalibrationApplier())
.build(output_stream=self.output_stream)
.run())
# yapf: enable
def _transform_buffer(self, buffer):
"""_transform is overloaded, so this function can be left empty."""
pass
class PacketReader(ParallelTransformer):
"""Reads the raw packets and converts them into LVPM Packet objects."""
def _transform_buffer(self, buffer):
"""Converts the raw packets to Packet objects in-place in buffer.
Args:
buffer: A list of bytes objects. Will be in-place replaced with
Packet objects.
"""
for i, packet in enumerate(buffer):
time_bytes_size = struct.calcsize('dd')
# Unpacks the two time.time() values sent by PacketCollector.
time_since_start, time_of_read = struct.unpack(
'dd', packet[:time_bytes_size])
packet = packet[time_bytes_size:]
# Magic number explanation:
# LVPM sample packets begin with 4 bytes, have at least one
# measurement (8 bytes), and have 1 last byte (usually a \x00 byte).
if len(packet) < 4 + 8 + 1 or packet[0] & 0x20 != 0x20:
logging.warning(
'Tried to collect power sample values, received data of '
'type=0x%02x, len=%d instead.', packet[0], len(packet))
buffer[i] = None
continue
buffer[i] = Packet(packet, time_since_start, time_of_read)
return buffer
class SampleChunker(SequentialTransformer):
"""Chunks input packets into lists of samples with identical calibration.
This step helps to quickly apply calibration across many samples at once.
Attributes:
_stored_raw_samples: The queue of raw samples that have yet to be
split into a new calibration group.
calibration_data: The calibration window information.
"""
def __init__(self):
super().__init__()
self._stored_raw_samples = []
self.calibration_data = LvpmCalibrationData()
def _on_end_of_stream(self, input_stream):
self._send_buffers(BufferList([self._cut_new_buffer()]))
super()._on_end_of_stream(input_stream)
def _transform_buffer(self, buffer):
"""Takes in data from the buffer and splits it based on calibration.
This transformer is meant to after the PacketReader.
Args:
buffer: A list of Packet objects.
Returns:
A BufferList containing 0 or more UncalibratedSampleChunk objects.
"""
buffer_list = BufferList()
for packet in buffer:
# If a read packet was not a sample, the PacketReader returns None.
# Skip over these dud values.
if packet is None:
continue
for sample in packet:
sample_type = sample.get_sample_type()
if sample_type == SampleType.MEASUREMENT:
self._stored_raw_samples.append(sample)
elif SampleType.is_calibration(sample_type):
if len(self._stored_raw_samples) > 0:
buffer_list.append(self._cut_new_buffer())
self.calibration_data.add_calibration_sample(sample)
else:
# There's no information on what this packet means within
# Monsoon documentation or code.
logging.warning('Received unidentifiable packet with '
'SampleType %s: %s' %
(sample_type, packet.get_bytes(0, None)))
return buffer_list
def _cut_new_buffer(self):
"""Cuts a new buffer from the input stream data.
Returns:
The newly generated UncalibratedSampleChunk.
"""
calibration_snapshot = LvpmCalibrationSnapshot(self.calibration_data)
new_chunk = UncalibratedSampleChunk(self._stored_raw_samples,
calibration_snapshot)
self._stored_raw_samples = []
return new_chunk
class LvpmReading(object):
"""The result of fully calibrating a sample. Contains all Monsoon readings.
Attributes:
_reading_list: The list of values obtained from the Monsoon.
_time_of_reading: The time since sampling began that the reading was
collected at.
"""
def __init__(self, reading_list, time_of_reading):
"""Creates an LvpmReading.
Args:
reading_list:
[0] Main Current
[1] USB Current
[2] Aux Current
time_of_reading: The time the reading was received.
"""
self._reading_list = reading_list
self._time_of_reading = time_of_reading
@property
def main_current(self):
return self._reading_list[0]
@property
def usb_current(self):
return self._reading_list[1]
@property
def aux_current(self):
return self._reading_list[2]
@property
def sample_time(self):
return self._time_of_reading
def __add__(self, other):
reading_list = [
self.main_current + other.main_current,
self.usb_current + other.usb_current,
self.aux_current + other.aux_current,
]
sample_time = self.sample_time + other.sample_time
return LvpmReading(reading_list, sample_time)
def __truediv__(self, other):
reading_list = [
self.main_current / other,
self.usb_current / other,
self.aux_current / other,
]
sample_time = self.sample_time / other
return LvpmReading(reading_list, sample_time)
class CalibrationApplier(ParallelTransformer):
"""Applies the calibration formula to the all given samples.
Designed to come after a SampleChunker Transformer.
"""
@staticmethod
def _is_device_calibrated(data):
"""Checks to see if the Monsoon has completed calibration.
Args:
data: the calibration data.
Returns:
True if the data is calibrated. False otherwise.
"""
try:
# If the data is calibrated for any Origin.REFERENCE value, it is
# calibrated for all Origin.REFERENCE values. The same is true for
# Origin.ZERO.
data.get(Channel.MAIN, Origin.REFERENCE, Granularity.COARSE)
data.get(Channel.MAIN, Origin.ZERO, Granularity.COARSE)
except CalibrationError:
return False
return True
def _transform_buffer(self, buffer):
calibration_data = buffer.calibration_data
if not self._is_device_calibrated(calibration_data):
return []
measurements = np.array([sample.values for sample in buffer.samples])
readings = np.zeros((len(buffer.samples), 5))
for channel in Channel.values:
fine_zero = calibration_data.get(channel, Origin.ZERO,
Granularity.FINE)
fine_scale = calibration_data.get(channel, Origin.SCALE,
Granularity.FINE)
coarse_zero = calibration_data.get(channel, Origin.ZERO,
Granularity.COARSE)
coarse_scale = calibration_data.get(channel, Origin.SCALE,
Granularity.COARSE)
# A set LSB means a coarse measurement. This bit needs to be
# cleared before setting calibration. Note that the
# reverse-engineered algorithm does not rightshift the bits after
# this operation. This explains the mismatch of calibration
# constants between the reverse-engineered algorithm and the
# Monsoon.py algorithm.
readings[:, channel] = np.where(
measurements[:, channel] & 1,
((measurements[:, channel] & ~1) - coarse_zero) * coarse_scale,
(measurements[:, channel] - fine_zero) * fine_scale)
for i in range(len(buffer.samples)):
buffer.samples[i] = LvpmReading(
list(readings[i]), buffer.samples[i].get_sample_time())
return buffer.samples