blob: 25296b6f94d7a1f5d0437924364274f3f1294e2c [file] [log] [blame]
# Copyright 2014 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import bisect
import json
import math
import os.path
import sys
import time
import cv2
import its.caps
import its.device
import its.image
import its.objects
import matplotlib
from matplotlib import pylab
import matplotlib.pyplot
import numpy
from PIL import Image
import scipy.spatial
NAME = os.path.basename(__file__).split(".")[0]
W, H = 640, 480
FPS = 30
TEST_LENGTH = 7 # seconds
FEATURE_MARGIN = 0.20 # Only take feature points from the center 20%
# so that the rotation measured have much less of rolling
# shutter effect
MIN_FEATURE_PTS = 30 # Minimum number of feature points required to
# perform rotation analysis
MAX_CAM_FRM_RANGE_SEC = 9.0 # Maximum allowed camera frame range. When this
# number is significantly larger than 7 seconds,
# usually system is in some busy/bad states.
MIN_GYRO_SMP_RATE = 100.0 # Minimum gyro sample rate
FEATURE_PARAMS = dict(maxCorners=240,
qualityLevel=0.3,
minDistance=7,
blockSize=7)
LK_PARAMS = dict(winSize=(15, 15),
maxLevel=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT,
10, 0.03))
# Constants to convert between different units (for clarity).
SEC_TO_NSEC = 1000*1000*1000.0
SEC_TO_MSEC = 1000.0
MSEC_TO_NSEC = 1000*1000.0
MSEC_TO_SEC = 1/1000.0
NSEC_TO_SEC = 1/(1000*1000*1000.0)
NSEC_TO_MSEC = 1/(1000*1000.0)
CM_TO_M = 1/100.0
# PASS/FAIL thresholds.
THRESH_MAX_CORR_DIST = 0.005
THRESH_MAX_SHIFT_MS = 1
THRESH_MIN_ROT = 0.001
# lens facing
FACING_FRONT = 0
FACING_BACK = 1
FACING_EXTERNAL = 2
# Chart distance
CHART_DISTANCE = 25 # cm
def main():
"""Test if image and motion sensor events are well synchronized.
The instructions for running this test are in the SensorFusion.pdf file in
the same directory as this test.
Note that if fps*test_length is too large, write speeds may become a
bottleneck and camera capture will slow down or stop.
Command line arguments:
fps: FPS to capture with during the test
img_size: Comma-separated dimensions of captured images (defaults to
640x480). Ex: "img_size=<width>,<height>"
replay: Without this argument, the test will collect a new set of
camera+gyro data from the device and then analyze it (and
it will also dump this data to files in the current
directory). If the "replay" argument is provided, then the
script will instead load the dumped data from a previous
run and analyze that instead. This can be helpful for
developers who are digging for additional information on
their measurements.
test_length: How long the test should run for (in seconds)
"""
fps = FPS
w, h = W, H
test_length = TEST_LENGTH
for s in sys.argv[1:]:
if s[:4] == "fps=" and len(s) > 4:
fps = int(s[4:])
elif s[:9] == "img_size=" and len(s) > 9:
# Split by comma and convert each dimension to int.
[w, h] = map(int, s[9:].split(","))
elif s[:12] == "test_length=" and len(s) > 12:
test_length = float(s[12:])
# Collect or load the camera+gyro data. All gyro events as well as camera
# timestamps are in the "events" dictionary, and "frames" is a list of
# RGB images as numpy arrays.
if "replay" not in sys.argv:
if w * h > 640 * 480 or fps * test_length > 300:
warning_str = (
"Warning: Your test parameters may require fast write speeds "
"to run smoothly. If you run into problems, consider smaller "
"values of \'w\', \'h\', \'fps\', or \'test_length\'."
)
print warning_str
events, frames = collect_data(fps, w, h, test_length)
else:
events, frames, _, h = load_data()
# Sanity check camera timestamps are enclosed by sensor timestamps
# This will catch bugs where camera and gyro timestamps go completely out
# of sync
cam_times = get_cam_times(events["cam"])
min_cam_time = min(cam_times) * NSEC_TO_SEC
max_cam_time = max(cam_times) * NSEC_TO_SEC
gyro_times = [e["time"] for e in events["gyro"]]
min_gyro_time = min(gyro_times) * NSEC_TO_SEC
max_gyro_time = max(gyro_times) * NSEC_TO_SEC
if not (min_cam_time > min_gyro_time and max_cam_time < max_gyro_time):
fail_str = ("Test failed: "
"camera timestamps [%f,%f] "
"are not enclosed by "
"gyro timestamps [%f, %f]"
) % (min_cam_time, max_cam_time,
min_gyro_time, max_gyro_time)
print fail_str
assert 0
cam_frame_range = max_cam_time - min_cam_time
gyro_time_range = max_gyro_time - min_gyro_time
gyro_smp_per_sec = len(gyro_times) / gyro_time_range
print "Camera frame range", max_cam_time - min_cam_time
print "Gyro samples per second", gyro_smp_per_sec
assert cam_frame_range < MAX_CAM_FRM_RANGE_SEC
assert gyro_smp_per_sec > MIN_GYRO_SMP_RATE
# Compute the camera rotation displacements (rad) between each pair of
# adjacent frames.
cam_rots = get_cam_rotations(frames, events["facing"], h)
if max(abs(cam_rots)) < THRESH_MIN_ROT:
print "Device wasn't moved enough"
assert 0
# Find the best offset (time-shift) to align the gyro and camera motion
# traces; this function integrates the shifted gyro data between camera
# samples for a range of candidate shift values, and returns the shift that
# result in the best correlation.
offset = get_best_alignment_offset(cam_times, cam_rots, events["gyro"])
# Plot the camera and gyro traces after applying the best shift.
cam_times += offset*SEC_TO_NSEC
gyro_rots = get_gyro_rotations(events["gyro"], cam_times)
plot_rotations(cam_rots, gyro_rots)
# Pass/fail based on the offset and also the correlation distance.
dist = scipy.spatial.distance.correlation(cam_rots, gyro_rots)
print "Best correlation of %f at shift of %.2fms"%(dist, offset*SEC_TO_MSEC)
assert dist < THRESH_MAX_CORR_DIST
assert abs(offset) < THRESH_MAX_SHIFT_MS*MSEC_TO_SEC
def get_best_alignment_offset(cam_times, cam_rots, gyro_events):
"""Find the best offset to align the camera and gyro traces.
Uses a correlation distance metric between the curves, where a smaller
value means that the curves are better-correlated.
Args:
cam_times: Array of N camera times, one for each frame.
cam_rots: Array of N-1 camera rotation displacements (rad).
gyro_events: List of gyro event objects.
Returns:
Offset (seconds) of the best alignment.
"""
# Measure the corr. dist. over a shift of up to +/- 50ms (0.5ms step size).
# Get the shift corresponding to the best (lowest) score.
candidates = numpy.arange(-50, 50.5, 0.5).tolist()
dists = []
for shift in candidates:
times = cam_times + shift*MSEC_TO_NSEC
gyro_rots = get_gyro_rotations(gyro_events, times)
dists.append(scipy.spatial.distance.correlation(cam_rots, gyro_rots))
best_corr_dist = min(dists)
best_shift = candidates[dists.index(best_corr_dist)]
print "Best shift without fitting is ", best_shift, "ms"
# Fit a curve to the corr. dist. data to measure the minima more
# accurately, by looking at the correlation distances within a range of
# +/- 10ms from the measured best score; note that this will use fewer
# than the full +/- 10 range for the curve fit if the measured score
# (which is used as the center of the fit) is within 10ms of the edge of
# the +/- 50ms candidate range.
i = dists.index(best_corr_dist)
candidates = candidates[i-20:i+21]
dists = dists[i-20:i+21]
a, b, c = numpy.polyfit(candidates, dists, 2)
exact_best_shift = -b/(2*a)
if abs(best_shift - exact_best_shift) > 2.0 or a <= 0 or c <= 0:
print "Test failed; bad fit to time-shift curve"
print "best_shift %f, exact_best_shift %f, a %f, c %f" % (
best_shift, exact_best_shift, a, c)
assert 0
xfit = numpy.arange(candidates[0], candidates[-1], 0.05).tolist()
yfit = [a*x*x+b*x+c for x in xfit]
matplotlib.pyplot.figure()
pylab.plot(candidates, dists, "r", label="data")
pylab.plot(xfit, yfit, "", label="fit")
pylab.plot([exact_best_shift+x for x in [-0.1, 0, 0.1]], [0, 0.01, 0], "b")
pylab.xlabel("Relative horizontal shift between curves (ms)")
pylab.ylabel("Correlation distance")
pylab.legend()
matplotlib.pyplot.savefig("%s_plot_shifts.png" % (NAME))
return exact_best_shift * MSEC_TO_SEC
def plot_rotations(cam_rots, gyro_rots):
"""Save a plot of the camera vs. gyro rotational measurements.
Args:
cam_rots: Array of N-1 camera rotation measurements (rad).
gyro_rots: Array of N-1 gyro rotation measurements (rad).
"""
# For the plot, scale the rotations to be in degrees.
scale = 360/(2*math.pi)
matplotlib.pyplot.figure()
cam_rots *= scale
gyro_rots *= scale
pylab.plot(range(len(cam_rots)), cam_rots, "r", label="camera")
pylab.plot(range(len(gyro_rots)), gyro_rots, "b", label="gyro")
pylab.legend()
pylab.xlabel("Camera frame number")
pylab.ylabel("Angular displacement between adjacent camera frames (deg)")
pylab.xlim([0, len(cam_rots)])
matplotlib.pyplot.savefig("%s_plot.png" % (NAME))
def get_gyro_rotations(gyro_events, cam_times):
"""Get the rotation values of the gyro.
Integrates the gyro data between each camera frame to compute an angular
displacement.
Args:
gyro_events: List of gyro event objects.
cam_times: Array of N camera times, one for each frame.
Returns:
Array of N-1 gyro rotation measurements (rad).
"""
all_times = numpy.array([e["time"] for e in gyro_events])
all_rots = numpy.array([e["z"] for e in gyro_events])
gyro_rots = []
# Integrate the gyro data between each pair of camera frame times.
for icam in range(len(cam_times)-1):
# Get the window of gyro samples within the current pair of frames.
tcam0 = cam_times[icam]
tcam1 = cam_times[icam+1]
igyrowindow0 = bisect.bisect(all_times, tcam0)
igyrowindow1 = bisect.bisect(all_times, tcam1)
sgyro = 0
# Integrate samples within the window.
for igyro in range(igyrowindow0, igyrowindow1):
vgyro = all_rots[igyro+1]
tgyro0 = all_times[igyro]
tgyro1 = all_times[igyro+1]
deltatgyro = (tgyro1 - tgyro0) * NSEC_TO_SEC
sgyro += vgyro * deltatgyro
# Handle the fractional intervals at the sides of the window.
for side, igyro in enumerate([igyrowindow0-1, igyrowindow1]):
vgyro = all_rots[igyro+1]
tgyro0 = all_times[igyro]
tgyro1 = all_times[igyro+1]
deltatgyro = (tgyro1 - tgyro0) * NSEC_TO_SEC
if side == 0:
f = (tcam0 - tgyro0) / (tgyro1 - tgyro0)
sgyro += vgyro * deltatgyro * (1.0 - f)
else:
f = (tcam1 - tgyro0) / (tgyro1 - tgyro0)
sgyro += vgyro * deltatgyro * f
gyro_rots.append(sgyro)
gyro_rots = numpy.array(gyro_rots)
return gyro_rots
def get_cam_rotations(frames, facing, h):
"""Get the rotations of the camera between each pair of frames.
Takes N frames and returns N-1 angular displacements corresponding to the
rotations between adjacent pairs of frames, in radians.
Args:
frames: List of N images (as RGB numpy arrays).
facing: Direction camera is facing
h: Pixel height of each frame
Returns:
Array of N-1 camera rotation measurements (rad).
"""
gframes = []
for frame in frames:
frame = (frame * 255.0).astype(numpy.uint8)
gframes.append(cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY))
rots = []
ymin = h*(1-FEATURE_MARGIN)/2
ymax = h*(1+FEATURE_MARGIN)/2
for i in range(1, len(gframes)):
gframe0 = gframes[i-1]
gframe1 = gframes[i]
p0 = cv2.goodFeaturesToTrack(gframe0, mask=None, **FEATURE_PARAMS)
# p0's shape is N * 1 * 2
mask = (p0[:, 0, 1] >= ymin) & (p0[:, 0, 1] <= ymax)
p0_filtered = p0[mask]
num_features = len(p0_filtered)
if num_features < MIN_FEATURE_PTS:
print "Not enough feature points in frame %s" % str(i-1).zfill(3)
print "Need at least %d features, got %d" % (
MIN_FEATURE_PTS, num_features)
assert 0
else:
print "Number of features in frame %s is %d" % (
str(i-1).zfill(3), num_features)
p1, st, _ = cv2.calcOpticalFlowPyrLK(gframe0, gframe1, p0_filtered,
None, **LK_PARAMS)
tform = procrustes_rotation(p0_filtered[st == 1], p1[st == 1])
if facing == FACING_BACK:
rot = -math.atan2(tform[0, 1], tform[0, 0])
elif facing == FACING_FRONT:
rot = math.atan2(tform[0, 1], tform[0, 0])
else:
print "Unknown lens facing", facing
assert 0
rots.append(rot)
if i == 1:
# Save a debug visualization of the features that are being
# tracked in the first frame.
frame = frames[i]
for x, y in p0_filtered[st == 1]:
cv2.circle(frame, (x, y), 3, (100, 100, 255), -1)
its.image.write_image(frame, "%s_features.png" % NAME)
return numpy.array(rots)
def get_cam_times(cam_events):
"""Get the camera frame times.
Args:
cam_events: List of (start_exposure, exposure_time, readout_duration)
tuples, one per captured frame, with times in nanoseconds.
Returns:
frame_times: Array of N times, one corresponding to the "middle" of
the exposure of each frame.
"""
# Assign a time to each frame that assumes that the image is instantly
# captured in the middle of its exposure.
starts = numpy.array([start for start, exptime, readout in cam_events])
exptimes = numpy.array([exptime for start, exptime, readout in cam_events])
readouts = numpy.array([readout for start, exptime, readout in cam_events])
frame_times = starts + (exptimes + readouts) / 2.0
return frame_times
def load_data():
"""Load a set of previously captured data.
Returns:
events: Dictionary containing all gyro events and cam timestamps.
frames: List of RGB images as numpy arrays.
w: Pixel width of frames
h: Pixel height of frames
"""
with open("%s_events.txt" % NAME, "r") as f:
events = json.loads(f.read())
n = len(events["cam"])
frames = []
for i in range(n):
img = Image.open("%s_frame%03d.png" % (NAME, i))
w, h = img.size[0:2]
frames.append(numpy.array(img).reshape(h, w, 3) / 255.0)
return events, frames, w, h
def collect_data(fps, w, h, test_length):
"""Capture a new set of data from the device.
Captures both motion data and camera frames, while the user is moving
the device in a proscribed manner.
Args:
fps: FPS to capture with
w: Pixel width of frames
h: Pixel height of frames
test_length: How long the test should run for (in seconds)
Returns:
events: Dictionary containing all gyro events and cam timestamps.
frames: List of RGB images as numpy arrays.
"""
with its.device.ItsSession() as cam:
props = cam.get_camera_properties()
props = cam.override_with_hidden_physical_camera_props(props)
its.caps.skip_unless(its.caps.read_3a and
its.caps.sensor_fusion(props) and
props["android.lens.facing"] != FACING_EXTERNAL and
cam.get_sensors().get("gyro"))
print "Starting sensor event collection"
cam.start_sensor_events()
# Sleep a while for gyro events to stabilize.
time.sleep(0.5)
# Capture the frames. OIS is disabled for manual captures.
facing = props["android.lens.facing"]
if facing != FACING_FRONT and facing != FACING_BACK:
print "Unknown lens facing", facing
assert 0
fmt = {"format": "yuv", "width": w, "height": h}
s, e, _, _, _ = cam.do_3a(get_results=True, do_af=False)
req = its.objects.manual_capture_request(s, e)
its.objects.turn_slow_filters_off(props, req)
fd_min = props["android.lens.info.minimumFocusDistance"]
fd_chart = 1 / (CHART_DISTANCE * CM_TO_M)
if fd_min < fd_chart:
req["android.lens.focusDistance"] = fd_min
else:
req["android.lens.focusDistance"] = fd_chart
req["android.control.aeTargetFpsRange"] = [fps, fps]
req["android.sensor.frameDuration"] = int(1000.0/fps * MSEC_TO_NSEC)
print "Capturing %dx%d with sens. %d, exp. time %.1fms at %dfps" % (
w, h, s, e*NSEC_TO_MSEC, fps)
caps = cam.do_capture([req]*int(fps*test_length), fmt)
# Capture a bit more gyro samples for use in
# get_best_alignment_offset
time.sleep(0.2)
# Get the gyro events.
print "Reading out sensor events"
gyro = cam.get_sensor_events()["gyro"]
print "Number of gyro samples", len(gyro)
# Combine the events into a single structure.
print "Dumping event data"
starts = [c["metadata"]["android.sensor.timestamp"] for c in caps]
exptimes = [c["metadata"]["android.sensor.exposureTime"] for c in caps]
readouts = [c["metadata"]["android.sensor.rollingShutterSkew"]
for c in caps]
events = {"gyro": gyro, "cam": zip(starts, exptimes, readouts),
"facing": facing}
with open("%s_events.txt" % NAME, "w") as f:
f.write(json.dumps(events))
# Convert the frames to RGB.
print "Dumping frames"
frames = []
for i, c in enumerate(caps):
img = its.image.convert_capture_to_rgb_image(c)
frames.append(img)
its.image.write_image(img, "%s_frame%03d.png" % (NAME, i))
return events, frames
def procrustes_rotation(X, Y):
"""Performs a Procrustes analysis to conform points in X to Y.
Procrustes analysis determines a linear transformation (translation,
reflection, orthogonal rotation and scaling) of the points in Y to best
conform them to the points in matrix X, using the sum of squared errors
as the goodness of fit criterion.
Args:
X: Target coordinate matrix
Y: Input coordinate matrix
Returns:
The rotation component of the transformation that maps X to Y.
"""
X0 = (X-X.mean(0)) / numpy.sqrt(((X-X.mean(0))**2.0).sum())
Y0 = (Y-Y.mean(0)) / numpy.sqrt(((Y-Y.mean(0))**2.0).sum())
U, _, Vt = numpy.linalg.svd(numpy.dot(X0.T, Y0), full_matrices=False)
return numpy.dot(Vt.T, U.T)
if __name__ == "__main__":
main()