blob: efeb665459bed093b5119eddf881c4aaa58b761e [file] [log] [blame]
# Copyright 2014 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import its.image
import its.device
import its.objects
import time
import math
import pylab
import os.path
import matplotlib
import matplotlib.pyplot
import json
import Image
import numpy
import cv2
import bisect
import scipy.spatial
import sys
NAME = os.path.basename(__file__).split(".")[0]
# Capture 210 QVGA frames (which is 7s at 30fps)
N = 210
W,H = 320,240
FEATURE_PARAMS = dict( maxCorners = 50,
qualityLevel = 0.3,
minDistance = 7,
blockSize = 7 )
LK_PARAMS = dict( winSize = (15, 15),
maxLevel = 2,
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT,
10, 0.03))
# Constants to convert between different time units (for clarity).
SEC_TO_NSEC = 1000*1000*1000.0
SEC_TO_MSEC = 1000.0
MSEC_TO_NSEC = 1000*1000.0
MSEC_TO_SEC = 1/1000.0
NSEC_TO_SEC = 1/(1000*1000*1000.0)
NSEC_TO_MSEC = 1/(1000*1000.0)
# Pass/fail thresholds.
THRESH_MAX_CORR_DIST = 0.005
THRESH_MAX_SHIFT_MS = 2
THRESH_MIN_ROT = 0.001
def main():
"""Test if image and motion sensor events are well synchronized.
The instructions for running this test are in the SensorFusion.pdf file in
the same directory as this test.
The command-line argument "replay" may be optionally provided. Without this
argument, the test will collect a new set of camera+gyro data from the
device and then analyze it (and it will also dump this data to files in the
current directory). If the "replay" argument is provided, then the script
will instead load the dumped data from a previous run and analyze that
instead. This can be helpful for developers who are digging for additional
information on their measurements.
"""
# Collect or load the camera+gyro data. All gyro events as well as camera
# timestamps are in the "events" dictionary, and "frames" is a list of
# RGB images as numpy arrays.
if "replay" not in sys.argv:
events, frames = collect_data()
else:
events, frames = load_data()
# Sanity check camera timestamps are enclosed by sensor timestamps
# This will catch bugs where camera and gyro timestamps go completely out
# of sync
cam_times = get_cam_times(events["cam"])
min_cam_time = min(cam_times) * NSEC_TO_SEC
max_cam_time = max(cam_times) * NSEC_TO_SEC
gyro_times = [e["time"] for e in events["gyro"]]
min_gyro_time = min(gyro_times) * NSEC_TO_SEC
max_gyro_time = max(gyro_times) * NSEC_TO_SEC
if not (min_cam_time > min_gyro_time and max_cam_time < max_gyro_time):
print "Test failed: camera timestamps [%f,%f] " \
"are not enclosed by gyro timestamps [%f, %f]" % (
min_cam_time, max_cam_time, min_gyro_time, max_gyro_time)
assert(0)
# Compute the camera rotation displacements (rad) between each pair of
# adjacent frames.
cam_rots = get_cam_rotations(frames)
if max(abs(cam_rots)) < THRESH_MIN_ROT:
print "Device wasn't moved enough"
assert(0)
# Find the best offset (time-shift) to align the gyro and camera motion
# traces; this function integrates the shifted gyro data between camera
# samples for a range of candidate shift values, and returns the shift that
# result in the best correlation.
offset = get_best_alignment_offset(cam_times, cam_rots, events["gyro"])
# Plot the camera and gyro traces after applying the best shift.
cam_times = cam_times + offset*SEC_TO_NSEC
gyro_rots = get_gyro_rotations(events["gyro"], cam_times)
plot_rotations(cam_rots, gyro_rots)
# Pass/fail based on the offset and also the correlation distance.
dist = scipy.spatial.distance.correlation(cam_rots,gyro_rots)
print "Best correlation of %f at shift of %.2fms"%(dist, offset*SEC_TO_MSEC)
assert(dist < THRESH_MAX_CORR_DIST)
assert(abs(offset) < THRESH_MAX_SHIFT_MS*MSEC_TO_SEC)
def get_best_alignment_offset(cam_times, cam_rots, gyro_events):
"""Find the best offset to align the camera and gyro traces.
Uses a correlation distance metric between the curves, where a smaller
value means that the curves are better-correlated.
Args:
cam_times: Array of N camera times, one for each frame.
cam_rots: Array of N-1 camera rotation displacements (rad).
gyro_events: List of gyro event objects.
Returns:
Offset (seconds) of the best alignment.
"""
# Measure the corr. dist. over a shift of up to +/- 100ms (1ms step size).
# Get the shift corresponding to the best (lowest) score.
candidates = range(-100,101)
dists = []
for shift in candidates:
times = cam_times + shift*MSEC_TO_NSEC
gyro_rots = get_gyro_rotations(gyro_events, times)
dists.append(scipy.spatial.distance.correlation(cam_rots,gyro_rots))
best_corr_dist = min(dists)
best_shift = candidates[dists.index(best_corr_dist)]
# Fit a curve to the corr. dist. data to measure the minima more
# accurately, by looking at the correlation distances within a range of
# +/- 20ms from the measured best score; note that this will use fewer
# than the full +/- 20 range for the curve fit if the measured score
# (which is used as the center of the fit) is within 20ms of the edge of
# the +/- 100ms candidate range.
i = len(dists)/2 + best_shift
candidates = candidates[i-20:i+21]
dists = dists[i-20:i+21]
a,b,c = numpy.polyfit(candidates, dists, 2)
exact_best_shift = -b/(2*a)
if abs(best_shift - exact_best_shift) > 2.0 or a <= 0 or c <= 0:
print "Test failed; bad fit to time-shift curve"
assert(0)
xfit = [x/10.0 for x in xrange(candidates[0]*10,candidates[-1]*10)]
yfit = [a*x*x+b*x+c for x in xfit]
fig = matplotlib.pyplot.figure()
pylab.plot(candidates, dists, 'r', label="data")
pylab.plot(xfit, yfit, 'b', label="fit")
pylab.plot([exact_best_shift+x for x in [-0.1,0,0.1]], [0,0.01,0], 'b')
pylab.xlabel("Relative horizontal shift between curves (ms)")
pylab.ylabel("Correlation distance")
pylab.legend()
matplotlib.pyplot.savefig("%s_plot_shifts.png" % (NAME))
return exact_best_shift * MSEC_TO_SEC
def plot_rotations(cam_rots, gyro_rots):
"""Save a plot of the camera vs. gyro rotational measurements.
Args:
cam_rots: Array of N-1 camera rotation measurements (rad).
gyro_rots: Array of N-1 gyro rotation measurements (rad).
"""
# For the plot, scale the rotations to be in degrees.
fig = matplotlib.pyplot.figure()
cam_rots = cam_rots * (360/(2*math.pi))
gyro_rots = gyro_rots * (360/(2*math.pi))
pylab.plot(range(len(cam_rots)), cam_rots, 'r', label="camera")
pylab.plot(range(len(gyro_rots)), gyro_rots, 'b', label="gyro")
pylab.legend()
pylab.xlabel("Camera frame number")
pylab.ylabel("Angular displacement between adjacent camera frames (deg)")
pylab.xlim([0, len(cam_rots)])
matplotlib.pyplot.savefig("%s_plot.png" % (NAME))
def get_gyro_rotations(gyro_events, cam_times):
"""Get the rotation values of the gyro.
Integrates the gyro data between each camera frame to compute an angular
displacement. Uses simple Euler approximation to implement the
integration.
Args:
gyro_events: List of gyro event objects.
cam_times: Array of N camera times, one for each frame.
Returns:
Array of N-1 gyro rotation measurements (rad).
"""
all_times = numpy.array([e["time"] for e in gyro_events])
all_rots = numpy.array([e["z"] for e in gyro_events])
gyro_rots = []
# Integrate the gyro data between each pair of camera frame times.
for icam in range(len(cam_times)-1):
# Get the window of gyro samples within the current pair of frames.
tcam0 = cam_times[icam]
tcam1 = cam_times[icam+1]
igyrowindow0 = bisect.bisect(all_times, tcam0)
igyrowindow1 = bisect.bisect(all_times, tcam1)
sgyro = 0
# Integrate samples within the window.
for igyro in range(igyrowindow0, igyrowindow1):
vgyro0 = all_rots[igyro]
vgyro1 = all_rots[igyro+1]
tgyro0 = all_times[igyro]
tgyro1 = all_times[igyro+1]
vgyro = 0.5 * (vgyro0 + vgyro1)
deltatgyro = (tgyro1 - tgyro0) * NSEC_TO_SEC
sgyro += vgyro * deltatgyro
# Handle the fractional intervals at the sides of the window.
for side,igyro in enumerate([igyrowindow0-1, igyrowindow1]):
vgyro0 = all_rots[igyro]
vgyro1 = all_rots[igyro+1]
tgyro0 = all_times[igyro]
tgyro1 = all_times[igyro+1]
vgyro = 0.5 * (vgyro0 + vgyro1)
deltatgyro = (tgyro1 - tgyro0) * NSEC_TO_SEC
if side == 0:
f = (tcam0 - tgyro0) / (tgyro1 - tgyro0)
sgyro += vgyro * deltatgyro * (1.0 - f)
else:
f = (tcam1 - tgyro0) / (tgyro1 - tgyro0)
sgyro += vgyro * deltatgyro * f
gyro_rots.append(sgyro)
gyro_rots = numpy.array(gyro_rots)
return gyro_rots
def get_cam_rotations(frames):
"""Get the rotations of the camera between each pair of frames.
Takes N frames and returns N-1 angular displacements corresponding to the
rotations between adjacent pairs of frames, in radians.
Args:
frames: List of N images (as RGB numpy arrays).
Returns:
Array of N-1 camera rotation measurements (rad).
"""
gframes = []
for frame in frames:
frame = (frame * 255.0).astype(numpy.uint8)
gframes.append(cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY))
rots = []
for i in range(1,len(gframes)):
gframe0 = gframes[i-1]
gframe1 = gframes[i]
p0 = cv2.goodFeaturesToTrack(gframe0, mask=None, **FEATURE_PARAMS)
p1,st,_ = cv2.calcOpticalFlowPyrLK(gframe0, gframe1, p0, None,
**LK_PARAMS)
tform = procrustes_rotation(p0[st==1], p1[st==1])
# TODO: Choose the sign for the rotation so the cam matches the gyro
rot = -math.atan2(tform[0, 1], tform[0, 0])
rots.append(rot)
if i == 1:
# Save a debug visualization of the features that are being
# tracked in the first frame.
frame = frames[i]
for x,y in p0[st==1]:
cv2.circle(frame, (x,y), 3, (100,100,255), -1)
its.image.write_image(frame, "%s_features.jpg"%(NAME))
return numpy.array(rots)
def get_cam_times(cam_events):
"""Get the camera frame times.
Args:
cam_events: List of (start_exposure, exposure_time, readout_duration)
tuples, one per captured frame, with times in nanoseconds.
Returns:
frame_times: Array of N times, one corresponding to the "middle" of
the exposure of each frame.
"""
# Assign a time to each frame that assumes that the image is instantly
# captured in the middle of its exposure.
starts = numpy.array([start for start,exptime,readout in cam_events])
exptimes = numpy.array([exptime for start,exptime,readout in cam_events])
readouts = numpy.array([readout for start,exptime,readout in cam_events])
frame_times = starts + (exptimes + readouts) / 2.0
return frame_times
def load_data():
"""Load a set of previously captured data.
Returns:
events: Dictionary containing all gyro events and cam timestamps.
frames: List of RGB images as numpy arrays.
"""
with open("%s_events.txt"%(NAME), "r") as f:
events = json.loads(f.read())
n = len(events["cam"])
frames = []
for i in range(n):
img = Image.open("%s_frame%03d.jpg"%(NAME,i))
w,h = img.size[0:2]
frames.append(numpy.array(img).reshape(h,w,3) / 255.0)
return events, frames
def collect_data():
"""Capture a new set of data from the device.
Captures both motion data and camera frames, while the user is moving
the device in a proscribed manner.
Returns:
events: Dictionary containing all gyro events and cam timestamps.
frames: List of RGB images as numpy arrays.
"""
with its.device.ItsSession() as cam:
print "Starting sensor event collection"
cam.start_sensor_events()
# Sleep a few seconds for gyro events to stabilize.
time.sleep(5)
# TODO: Ensure that OIS is disabled; set to DISABLE and wait some time.
# Capture the frames.
props = cam.get_camera_properties()
fmt = {"format":"yuv", "width":W, "height":H}
s,e,_,_,_ = cam.do_3a(get_results=True)
req = its.objects.manual_capture_request(s, e)
print "Capturing %dx%d with sens. %d, exp. time %.1fms" % (
W, H, s, e*NSEC_TO_MSEC)
caps = cam.do_capture([req]*N, fmt)
# Get the gyro events.
print "Reading out sensor events"
gyro = cam.get_sensor_events()["gyro"]
# Combine the events into a single structure.
print "Dumping event data"
starts = [c["metadata"]["android.sensor.timestamp"] for c in caps]
exptimes = [c["metadata"]["android.sensor.exposureTime"] for c in caps]
readouts = [c["metadata"]["android.sensor.rollingShutterSkew"]
for c in caps]
events = {"gyro": gyro, "cam": zip(starts,exptimes,readouts)}
with open("%s_events.txt"%(NAME), "w") as f:
f.write(json.dumps(events))
# Convert the frames to RGB.
print "Dumping frames"
frames = []
for i,c in enumerate(caps):
img = its.image.convert_capture_to_rgb_image(c)
frames.append(img)
its.image.write_image(img, "%s_frame%03d.jpg"%(NAME,i))
return events, frames
def procrustes_rotation(X, Y):
"""
Procrustes analysis determines a linear transformation (translation,
reflection, orthogonal rotation and scaling) of the points in Y to best
conform them to the points in matrix X, using the sum of squared errors
as the goodness of fit criterion.
Args:
X, Y: Matrices of target and input coordinates.
Returns:
The rotation component of the transformation that maps X to Y.
"""
X0 = (X-X.mean(0)) / numpy.sqrt(((X-X.mean(0))**2.0).sum())
Y0 = (Y-Y.mean(0)) / numpy.sqrt(((Y-Y.mean(0))**2.0).sum())
U,s,Vt = numpy.linalg.svd(numpy.dot(X0.T, Y0),full_matrices=False)
return numpy.dot(Vt.T, U.T)
if __name__ == '__main__':
main()