blob: f3d63c5c365d1b1c64f41b0c9deab567c2309491 [file] [log] [blame]
# Copyright (C) 2014 The Android Open Source Project
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interface for a USB-connected Monsoon power meter
This file requires gflags, which requires setuptools.
To install setuptools: sudo apt-get install python-setuptools
To install gflags, see
To install pyserial, see
Example usages:
Set the voltage of the device 7536 to 4.0V
python2.6 --voltage=4.0 --serialno 7536
Get 5000hz data from device number 7536, with unlimited number of samples
python2.6 --samples -1 --hz 5000 --serialno 7536
Get 200Hz data for 5 seconds (1000 events) from default device
python2.6 --samples 100 --hz 200
Get unlimited 200Hz data from device attached at /dev/ttyACM0
python2.6 --samples -1 --hz 200 --device /dev/ttyACM0
import fcntl
import os
import select
import signal
import stat
import struct
import sys
import time
import collections
import gflags as flags #
import serial #
class Monsoon:
Provides a simple class to use the power meter, e.g.
mon = monsoon.Monsoon()
mydata = []
while len(mydata) < 1000:
def __init__(self, device=None, serialno=None, wait=1):
Establish a connection to a Monsoon.
By default, opens the first available port, waiting if none are ready.
A particular port can be specified with "device", or a particular Monsoon
can be specified with "serialno" (using the number printed on its back).
With wait=0, IOError is thrown if a device is not immediately available.
self._coarse_ref = self._fine_ref = self._coarse_zero = self._fine_zero = 0
self._coarse_scale = self._fine_scale = 0
self._last_seq = 0
self.start_voltage = 0
if device:
self.ser = serial.Serial(device, timeout=1)
while True: # try all /dev/ttyACM* until we find one we can use
for dev in os.listdir("/dev"):
if not dev.startswith("ttyACM"): continue
tmpname = "/tmp/monsoon.%s.%s" % (os.uname()[0], dev)
self._tempfile = open(tmpname, "w")
os.chmod(tmpname, 0666)
except OSError:
try: # use a lockfile to ensure exclusive access
fcntl.lockf(self._tempfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError as e:
print >>sys.stderr, "device %s is in use" % dev
try: # try to open the device
self.ser = serial.Serial("/dev/%s" % dev, timeout=1)
self.StopDataCollection() # just in case
self._FlushInput() # discard stale input
status = self.GetStatus()
except Exception as e:
print >>sys.stderr, "error opening device %s: %s" % (dev, e)
if not status:
print >>sys.stderr, "no response from device %s" % dev
elif serialno and status["serialNumber"] != serialno:
print >>sys.stderr, ("Note: another device serial #%d seen on %s" %
(status["serialNumber"], dev))
self.start_voltage = status["voltage1"]
self._tempfile = None
if not wait: raise IOError("No device found")
print >>sys.stderr, "waiting for device..."
def GetStatus(self):
""" Requests and waits for status. Returns status dictionary. """
# status packet format
"packetType", "firmwareVersion", "protocolVersion",
"mainFineCurrent", "usbFineCurrent", "auxFineCurrent", "voltage1",
"mainCoarseCurrent", "usbCoarseCurrent", "auxCoarseCurrent", "voltage2",
"outputVoltageSetting", "temperature", "status", "leds",
"mainFineResistor", "serialNumber", "sampleRate",
"dacCalLow", "dacCalHigh",
"powerUpCurrentLimit", "runTimeCurrentLimit", "powerUpTime",
"usbFineResistor", "auxFineResistor",
"initialUsbVoltage", "initialAuxVoltage",
"hardwareRevision", "temperatureLimit", "usbPassthroughMode",
"mainCoarseResistor", "usbCoarseResistor", "auxCoarseResistor",
"defMainFineResistor", "defUsbFineResistor", "defAuxFineResistor",
"defMainCoarseResistor", "defUsbCoarseResistor", "defAuxCoarseResistor",
"eventCode", "eventData", ]
self._SendStruct("BBB", 0x01, 0x00, 0x00)
while True: # Keep reading, discarding non-status packets
bytes = self._ReadPacket()
if not bytes: return None
if len(bytes) != struct.calcsize(STATUS_FORMAT) or bytes[0] != "\x10":
print >>sys.stderr, "wanted status, dropped type=0x%02x, len=%d" % (
ord(bytes[0]), len(bytes))
status = dict(zip(STATUS_FIELDS, struct.unpack(STATUS_FORMAT, bytes)))
assert status["packetType"] == 0x10
for k in status.keys():
if k.endswith("VoltageSetting"):
status[k] = 2.0 + status[k] * 0.01
elif k.endswith("FineCurrent"):
pass # needs calibration data
elif k.endswith("CoarseCurrent"):
pass # needs calibration data
elif k.startswith("voltage") or k.endswith("Voltage"):
status[k] = status[k] * 0.000125
elif k.endswith("Resistor"):
status[k] = 0.05 + status[k] * 0.0001
if k.startswith("aux") or k.startswith("defAux"): status[k] += 0.05
elif k.endswith("CurrentLimit"):
status[k] = 8 * (1023 - status[k]) / 1023.0
return status
def RampVoltage(self, start, end):
v = start
if v < 3.0: v = 3.0 # protocol doesn't support lower than this
while (v < end):
v += .1
def SetVoltage(self, v):
""" Set the output voltage, 0 to disable. """
if v == 0:
self._SendStruct("BBB", 0x01, 0x01, 0x00)
self._SendStruct("BBB", 0x01, 0x01, int((v - 2.0) * 100))
def SetMaxCurrent(self, i):
"""Set the max output current."""
assert i >= 0 and i <= 8
val = 1023 - int((i/8)*1023)
self._SendStruct("BBB", 0x01, 0x0a, val & 0xff)
self._SendStruct("BBB", 0x01, 0x0b, val >> 8)
def SetUsbPassthrough(self, val):
""" Set the USB passthrough mode: 0 = off, 1 = on, 2 = auto. """
self._SendStruct("BBB", 0x01, 0x10, val)
def StartDataCollection(self):
""" Tell the device to start collecting and sending measurement data. """
self._SendStruct("BBB", 0x01, 0x1b, 0x01) # Mystery command
self._SendStruct("BBBBBBB", 0x02, 0xff, 0xff, 0xff, 0xff, 0x03, 0xe8)
def StopDataCollection(self):
""" Tell the device to stop collecting measurement data. """
self._SendStruct("BB", 0x03, 0x00) # stop
def CollectData(self):
""" Return some current samples. Call StartDataCollection() first. """
while True: # loop until we get data or a timeout
bytes = self._ReadPacket()
if not bytes: return None
if len(bytes) < 4 + 8 + 1 or bytes[0] < "\x20" or bytes[0] > "\x2F":
print >>sys.stderr, "wanted data, dropped type=0x%02x, len=%d" % (
ord(bytes[0]), len(bytes))
seq, type, x, y = struct.unpack("BBBB", bytes[:4])
data = [struct.unpack(">hhhh", bytes[x:x+8])
for x in range(4, len(bytes) - 8, 8)]
if self._last_seq and seq & 0xF != (self._last_seq + 1) & 0xF:
print >>sys.stderr, "data sequence skipped, lost packet?"
self._last_seq = seq
if type == 0:
if not self._coarse_scale or not self._fine_scale:
print >>sys.stderr, "waiting for calibration, dropped data packet"
out = []
for main, usb, aux, voltage in data:
if main & 1:
out.append(((main & ~1) - self._coarse_zero) * self._coarse_scale)
out.append((main - self._fine_zero) * self._fine_scale)
return out
elif type == 1:
self._fine_zero = data[0][0]
self._coarse_zero = data[1][0]
# print >>sys.stderr, "zero calibration: fine 0x%04x, coarse 0x%04x" % (
# self._fine_zero, self._coarse_zero)
elif type == 2:
self._fine_ref = data[0][0]
self._coarse_ref = data[1][0]
# print >>sys.stderr, "ref calibration: fine 0x%04x, coarse 0x%04x" % (
# self._fine_ref, self._coarse_ref)
print >>sys.stderr, "discarding data packet type=0x%02x" % type
if self._coarse_ref != self._coarse_zero:
self._coarse_scale = 2.88 / (self._coarse_ref - self._coarse_zero)
if self._fine_ref != self._fine_zero:
self._fine_scale = 0.0332 / (self._fine_ref - self._fine_zero)
def _SendStruct(self, fmt, *args):
""" Pack a struct (without length or checksum) and send it. """
data = struct.pack(fmt, *args)
data_len = len(data) + 1
checksum = (data_len + sum(struct.unpack("B" * len(data), data))) % 256
out = struct.pack("B", data_len) + data + struct.pack("B", checksum)
def _ReadPacket(self):
""" Read a single data record as a string (without length or checksum). """
len_char =
if not len_char:
print >>sys.stderr, "timeout reading from serial port"
return None
data_len = struct.unpack("B", len_char)
data_len = ord(len_char)
if not data_len: return ""
result =
if len(result) != data_len: return None
body = result[:-1]
checksum = (data_len + sum(struct.unpack("B" * len(body), body))) % 256
if result[-1] != struct.pack("B", checksum):
print >>sys.stderr, "invalid checksum from serial port"
return None
return result[:-1]
def _FlushInput(self):
""" Flush all read data until no more available. """
flushed = 0
while True:
ready_r, ready_w, ready_x =[self.ser], [], [self.ser], 0)
if len(ready_x) > 0:
print >>sys.stderr, "exception from serial port"
return None
elif len(ready_r) > 0:
flushed += 1 # This may cause underlying buffering.
self.ser.flush() # Flush the underlying buffer too.
if flushed > 0:
print >>sys.stderr, "dropped >%d bytes" % flushed
def main(argv):
""" Simple command-line interface for Monsoon."""
useful_flags = ["voltage", "status", "usbpassthrough", "samples", "current"]
if not [f for f in useful_flags if FLAGS.get(f, None) is not None]:
print __doc__.strip()
print FLAGS.MainModuleHelp()
if FLAGS.avg and FLAGS.avg < 0:
print "--avg must be greater than 0"
mon = Monsoon(device=FLAGS.device, serialno=FLAGS.serialno)
if FLAGS.voltage is not None:
if FLAGS.ramp is not None:
mon.RampVoltage(mon.start_voltage, FLAGS.voltage)
if FLAGS.current is not None:
if FLAGS.status:
items = sorted(mon.GetStatus().items())
print "\n".join(["%s: %s" % item for item in items])
if FLAGS.usbpassthrough:
if FLAGS.usbpassthrough == 'off':
elif FLAGS.usbpassthrough == 'on':
elif FLAGS.usbpassthrough == 'auto':
sys.exit('bad passthrough flag: %s' % FLAGS.usbpassthrough)
if FLAGS.samples:
# Make sure state is normal
status = mon.GetStatus()
native_hz = status["sampleRate"] * 1000
# Collect and average samples as specified
# In case FLAGS.hz doesn't divide native_hz exactly, use this invariant:
# 'offset' = (consumed samples) * FLAGS.hz - (emitted samples) * native_hz
# This is the error accumulator in a variation of Bresenham's algorithm.
emitted = offset = 0
collected = []
history_deque = collections.deque() # past n samples for rolling average
last_flush = time.time()
while emitted < FLAGS.samples or FLAGS.samples == -1:
# The number of raw samples to consume before emitting the next output
need = (native_hz - offset + FLAGS.hz - 1) / FLAGS.hz
if need > len(collected): # still need more input samples
samples = mon.CollectData()
if not samples: break
# Have enough data, generate output samples.
# Adjust for consuming 'need' input samples.
offset += need * FLAGS.hz
while offset >= native_hz: # maybe multiple, if FLAGS.hz > native_hz
this_sample = sum(collected[:need]) / need
if FLAGS.timestamp: print int(time.time()),
if FLAGS.avg:
if len(history_deque) > FLAGS.avg: history_deque.pop()
print "%f %f" % (this_sample,
sum(history_deque) / len(history_deque))
print "%f" % this_sample
offset -= native_hz
emitted += 1 # adjust for emitting 1 output sample
collected = collected[need:]
now = time.time()
if now - last_flush >= 0.99: # flush every second
last_flush = now
except KeyboardInterrupt:
print >>sys.stderr, "interrupted"
if __name__ == '__main__':
# Define flags here to avoid conflicts with people who use us as a library
flags.DEFINE_boolean("status", None, "Print power meter status")
flags.DEFINE_integer("avg", None,
"Also report average over last n data points")
flags.DEFINE_float("voltage", None, "Set output voltage (0 for off)")
flags.DEFINE_float("current", None, "Set max output current")
flags.DEFINE_string("usbpassthrough", None, "USB control (on, off, auto)")
flags.DEFINE_integer("samples", None, "Collect and print this many samples")
flags.DEFINE_integer("hz", 5000, "Print this many samples/sec")
flags.DEFINE_string("device", None,
"Path to the device in /dev/... (ex:/dev/ttyACM1)")
flags.DEFINE_integer("serialno", None, "Look for a device with this serial number")
flags.DEFINE_boolean("timestamp", None,
"Also print integer (seconds) timestamp on each line")
flags.DEFINE_boolean("ramp", True, "Gradually increase voltage")