blob: 80c8274bd8afc616f934cea521c93099ec38b94f [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import struct
from acts.controllers.monsoon_lib.sampling.enums import Reading
from acts.controllers.monsoon_lib.sampling.enums import Granularity
class SampleType:
"""An enum-like class that defines the SampleTypes for LVPM data.
Note that these values differ from the HVPM values.
"""
# A measurement sample.
MEASUREMENT = 0x00
# A zero calibration sample.
ZERO_CAL = 0x01
# A reference calibration sample.
REF_CAL = 0x02
@staticmethod
def is_calibration(value):
"""Returns true iff the SampleType is a type of calibration."""
return value == SampleType.ZERO_CAL or value == SampleType.REF_CAL
class LvpmMeasurement(object):
"""An object that tracks an individual measurement within the LvpmPacket.
Attributes:
_sample_time: The time the sample was taken.
_sample_type: The type of sample stored.
values: From reverse engineering, the values are as follows:
If the measurement is a calibration measurement:
Val │ Byte │ Type │ Monsoon │ Reading │
Pos │ Offset │ Format │ Channel │ Type │ Description
────┼────────┼────────┼─────────┼─────────┼──────────────────────────────
0 │ 0 │ uint16 │ Main │ Current │ Calibration value.
1 │ 2 │ uint16 │ USB │ Current │ Calibration value.
2 │ 4 │ uint16 │ Aux │ Current │ Calibration value.
3 │ 6 │ uint16 │ Main │ Voltage │ Calibration value.
If the measurement is a power reading:
Val │ Byte │ Type │ Monsoon │ Reading │
Pos │ Offset │ Format │ Channel │ Type │ Description
────┼────────┼────────┼─────────┼─────────┼──────────────────────────────
0 │ 0 │ uint16 │ Main │ Current │ b0: if 1, Coarse, else Fine
│ │ │ │ │ b1-7: Measurement value.
1 │ 2 │ uint16 │ USB │ Current │ b0: if 1, Coarse, else Fine
│ │ │ │ │ b1-7: Measurement value.
2 │ 4 │ uint16 │ Aux │ Current │ b0: if 1, Coarse, else Fine
│ │ │ │ │ b1-7: Measurement value.
3 │ 6 │ uint16 │ Main │ Voltage │ Measurement value.
"""
# The total number of bytes in a measurement. See the table above.
SIZE = 8
def __init__(self, raw_data, sample_time, sample_type, entry_index):
"""Creates a new LVPM Measurement.
Args:
raw_data: The raw data format of the LvpmMeasurement.
sample_time: The time the sample was recorded.
sample_type: The type of sample that was recorded.
entry_index: The index of the measurement within the packet.
"""
self.values = struct.unpack('>4H', raw_data)
self._sample_time = sample_time
self._sample_type = sample_type
if SampleType.is_calibration(self._sample_type):
# Calibration packets have granularity values determined by whether
# or not the entry was odd or even within the returned packet.
if entry_index % 2 == 0:
self._granularity = Granularity.FINE
else:
self._granularity = Granularity.COARSE
else:
# If it is not a calibration packet, each individual reading (main
# current, usb current, etc) determines granularity value by
# checking the LSB of the measurement value.
self._granularity = None
def __getitem__(self, channel_or_reading):
"""Returns the requested reading for the given channel.
Args:
channel_or_reading: either a Channel or Reading.Voltage.
"""
if channel_or_reading == Reading.VOLTAGE:
return self.values[3]
else:
# Must be a channel. If it is not, this line will throw an
# IndexError, which is what we will want for invalid values.
return self.values[channel_or_reading]
def get_sample_time(self):
"""Returns the time (since the start time) this sample was collected."""
return self._sample_time
def get_sample_type(self):
"""Returns a value contained in SampleType."""
return self._sample_type
def get_calibration_granularity(self):
"""Returns the granularity associated with this packet.
If the packet is not a calibration packet, None is returned.
"""
return self._granularity
class Packet(object):
"""A packet collected directly from serial.read() during sample collection.
Note that the true documentation for this has been lost to time. This class
and documentation uses knowledge that comes from several reverse-engineering
projects. Most of this knowledge comes from
http://wiki/Main/MonsoonProtocol.
The data table looks approximately like this:
Offset │ Format │ Field │ Description
───────┼─────────┼─────────┼────────────────────────────────────────────
0 │ uint8 │ flags │ Bits:
│ │ & │ * b0-3: Sequence number (0-15). Increments
│ │ seq │ each packet
│ │ │ * b4: 1 means over-current or thermal kill
│ │ │ * b5: Main Output, 1 == unit is at voltage,
│ │ │ 0 == output disabled.
│ │ │ * b6-7: reserved.
1 │ uint8 │ packet │ The type of the packet:
│ │ type │ * 0: A data packet
│ │ │ * 1: A zero calibration packet
│ │ │ * 2: A reference calibration packet
2 │ uint8 │ unknown │ Always seems to be 0x00
3 │ uint8 │ unknown │ Always seems to be 0x00 or 0xC4.
4 │ byte[8] │ data │ See LvpmMeasurement.
... │ byte[8] │ data │ Additional LvpmMeasurements.
-1 │ uint8 │ unknown │ Last byte, unknown values. Has been seen to
│ │ │ usually be \x00, or \x84.
Attributes:
_packet_data: The raw data received from the packet.
time_since_start: The timestamp (relative to start) this packet was
collected.
time_since_last_sample: The differential between this packet's
time_since_start and the previous packet's. Note that for the first
packet, this value will be equal to time_since_start.
"""
# The number of bytes before the first packet.
FIRST_MEASUREMENT_OFFSET = 4
def __init__(self, sampled_bytes, time_since_start,
time_since_last_sample):
self._packet_data = sampled_bytes
self.time_since_start = time_since_start
self.time_since_last_sample = time_since_last_sample
num_data_bytes = len(sampled_bytes) - Packet.FIRST_MEASUREMENT_OFFSET
num_packets = num_data_bytes // LvpmMeasurement.SIZE
sample_struct_format = (str(LvpmMeasurement.SIZE) + 's') * num_packets
struct_string = '>2B2x%sx' % sample_struct_format
self._flag_data, self.packet_type, *samples = struct.unpack(
struct_string, sampled_bytes)
self.measurements = [None] * len(samples)
for index, raw_measurement in enumerate(samples):
self.measurements[index] = LvpmMeasurement(
raw_measurement, self._get_sample_time(index),
self.packet_type, index)
def _get_sample_time(self, index):
"""Returns the time the sample at the given index was received.
If multiple samples were captured within the same reading, the samples
are assumed to be uniformly distributed during the time it took to
sample the values.
Args:
index: the index of the individual reading from within the sample.
"""
time_per_sample = self.time_since_last_sample / len(self.measurements)
return time_per_sample * (index + 1) + self.time_since_start
@property
def packet_counter(self):
return self._flag_data & 0x0F
def get_bytes(self, start, end_exclusive):
"""Returns a bytearray spanning from start to the end (exclusive)."""
return self._packet_data[start:end_exclusive]
def __getitem__(self, index):
return self.measurements[index]
def __len__(self):
return len(self.measurements)