android / platform / cts / 0ab277a548e474697d48651a389dbf4aea4745f9 / . / apps / CameraITS / tests / sensor_fusion / test_sensor_fusion.py

# Copyright 2014 The Android Open Source Project | |

# | |

# Licensed under the Apache License, Version 2.0 (the "License"); | |

# you may not use this file except in compliance with the License. | |

# You may obtain a copy of the License at | |

# | |

# http://www.apache.org/licenses/LICENSE-2.0 | |

# | |

# Unless required by applicable law or agreed to in writing, software | |

# distributed under the License is distributed on an "AS IS" BASIS, | |

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |

# See the License for the specific language governing permissions and | |

# limitations under the License. | |

import its.image | |

import its.device | |

import its.objects | |

import time | |

import math | |

import pylab | |

import os.path | |

import matplotlib | |

import matplotlib.pyplot | |

import json | |

import Image | |

import numpy | |

import cv2 | |

import bisect | |

import scipy.spatial | |

import sys | |

NAME = os.path.basename(__file__).split(".")[0] | |

# Capture 210 QVGA frames (which is 7s at 30fps) | |

N = 210 | |

W,H = 320,240 | |

FEATURE_PARAMS = dict( maxCorners = 50, | |

qualityLevel = 0.3, | |

minDistance = 7, | |

blockSize = 7 ) | |

LK_PARAMS = dict( winSize = (15, 15), | |

maxLevel = 2, | |

criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, | |

10, 0.03)) | |

# Constants to convert between different time units (for clarity). | |

SEC_TO_NSEC = 1000*1000*1000.0 | |

SEC_TO_MSEC = 1000.0 | |

MSEC_TO_NSEC = 1000*1000.0 | |

MSEC_TO_SEC = 1/1000.0 | |

NSEC_TO_SEC = 1/(1000*1000*1000.0) | |

NSEC_TO_MSEC = 1/(1000*1000.0) | |

# Pass/fail thresholds. | |

THRESH_MAX_CORR_DIST = 0.005 | |

THRESH_MAX_SHIFT_MS = 2 | |

THRESH_MIN_ROT = 0.001 | |

def main(): | |

"""Test if image and motion sensor events are well synchronized. | |

The instructions for running this test are in the SensorFusion.pdf file in | |

the same directory as this test. | |

The command-line argument "replay" may be optionally provided. Without this | |

argument, the test will collect a new set of camera+gyro data from the | |

device and then analyze it (and it will also dump this data to files in the | |

current directory). If the "replay" argument is provided, then the script | |

will instead load the dumped data from a previous run and analyze that | |

instead. This can be helpful for developers who are digging for additional | |

information on their measurements. | |

""" | |

# Collect or load the camera+gyro data. All gyro events as well as camera | |

# timestamps are in the "events" dictionary, and "frames" is a list of | |

# RGB images as numpy arrays. | |

if "replay" not in sys.argv: | |

events, frames = collect_data() | |

else: | |

events, frames = load_data() | |

# Sanity check camera timestamps are enclosed by sensor timestamps | |

# This will catch bugs where camera and gyro timestamps go completely out | |

# of sync | |

cam_times = get_cam_times(events["cam"]) | |

min_cam_time = min(cam_times) * NSEC_TO_SEC | |

max_cam_time = max(cam_times) * NSEC_TO_SEC | |

gyro_times = [e["time"] for e in events["gyro"]] | |

min_gyro_time = min(gyro_times) * NSEC_TO_SEC | |

max_gyro_time = max(gyro_times) * NSEC_TO_SEC | |

if not (min_cam_time > min_gyro_time and max_cam_time < max_gyro_time): | |

print "Test failed: camera timestamps [%f,%f] " \ | |

"are not enclosed by gyro timestamps [%f, %f]" % ( | |

min_cam_time, max_cam_time, min_gyro_time, max_gyro_time) | |

assert(0) | |

# Compute the camera rotation displacements (rad) between each pair of | |

# adjacent frames. | |

cam_rots = get_cam_rotations(frames) | |

if max(abs(cam_rots)) < THRESH_MIN_ROT: | |

print "Device wasn't moved enough" | |

assert(0) | |

# Find the best offset (time-shift) to align the gyro and camera motion | |

# traces; this function integrates the shifted gyro data between camera | |

# samples for a range of candidate shift values, and returns the shift that | |

# result in the best correlation. | |

offset = get_best_alignment_offset(cam_times, cam_rots, events["gyro"]) | |

# Plot the camera and gyro traces after applying the best shift. | |

cam_times = cam_times + offset*SEC_TO_NSEC | |

gyro_rots = get_gyro_rotations(events["gyro"], cam_times) | |

plot_rotations(cam_rots, gyro_rots) | |

# Pass/fail based on the offset and also the correlation distance. | |

dist = scipy.spatial.distance.correlation(cam_rots,gyro_rots) | |

print "Best correlation of %f at shift of %.2fms"%(dist, offset*SEC_TO_MSEC) | |

assert(dist < THRESH_MAX_CORR_DIST) | |

assert(abs(offset) < THRESH_MAX_SHIFT_MS*MSEC_TO_SEC) | |

def get_best_alignment_offset(cam_times, cam_rots, gyro_events): | |

"""Find the best offset to align the camera and gyro traces. | |

Uses a correlation distance metric between the curves, where a smaller | |

value means that the curves are better-correlated. | |

Args: | |

cam_times: Array of N camera times, one for each frame. | |

cam_rots: Array of N-1 camera rotation displacements (rad). | |

gyro_events: List of gyro event objects. | |

Returns: | |

Offset (seconds) of the best alignment. | |

""" | |

# Measure the corr. dist. over a shift of up to +/- 100ms (1ms step size). | |

# Get the shift corresponding to the best (lowest) score. | |

candidates = range(-100,101) | |

dists = [] | |

for shift in candidates: | |

times = cam_times + shift*MSEC_TO_NSEC | |

gyro_rots = get_gyro_rotations(gyro_events, times) | |

dists.append(scipy.spatial.distance.correlation(cam_rots,gyro_rots)) | |

best_corr_dist = min(dists) | |

best_shift = candidates[dists.index(best_corr_dist)] | |

# Fit a curve to the corr. dist. data to measure the minima more | |

# accurately, by looking at the correlation distances within a range of | |

# +/- 20ms from the measured best score; note that this will use fewer | |

# than the full +/- 20 range for the curve fit if the measured score | |

# (which is used as the center of the fit) is within 20ms of the edge of | |

# the +/- 100ms candidate range. | |

i = len(dists)/2 + best_shift | |

candidates = candidates[i-20:i+21] | |

dists = dists[i-20:i+21] | |

a,b,c = numpy.polyfit(candidates, dists, 2) | |

exact_best_shift = -b/(2*a) | |

if abs(best_shift - exact_best_shift) > 2.0 or a <= 0 or c <= 0: | |

print "Test failed; bad fit to time-shift curve" | |

assert(0) | |

xfit = [x/10.0 for x in xrange(candidates[0]*10,candidates[-1]*10)] | |

yfit = [a*x*x+b*x+c for x in xfit] | |

fig = matplotlib.pyplot.figure() | |

pylab.plot(candidates, dists, 'r', label="data") | |

pylab.plot(xfit, yfit, 'b', label="fit") | |

pylab.plot([exact_best_shift+x for x in [-0.1,0,0.1]], [0,0.01,0], 'b') | |

pylab.xlabel("Relative horizontal shift between curves (ms)") | |

pylab.ylabel("Correlation distance") | |

pylab.legend() | |

matplotlib.pyplot.savefig("%s_plot_shifts.png" % (NAME)) | |

return exact_best_shift * MSEC_TO_SEC | |

def plot_rotations(cam_rots, gyro_rots): | |

"""Save a plot of the camera vs. gyro rotational measurements. | |

Args: | |

cam_rots: Array of N-1 camera rotation measurements (rad). | |

gyro_rots: Array of N-1 gyro rotation measurements (rad). | |

""" | |

# For the plot, scale the rotations to be in degrees. | |

fig = matplotlib.pyplot.figure() | |

cam_rots = cam_rots * (360/(2*math.pi)) | |

gyro_rots = gyro_rots * (360/(2*math.pi)) | |

pylab.plot(range(len(cam_rots)), cam_rots, 'r', label="camera") | |

pylab.plot(range(len(gyro_rots)), gyro_rots, 'b', label="gyro") | |

pylab.legend() | |

pylab.xlabel("Camera frame number") | |

pylab.ylabel("Angular displacement between adjacent camera frames (deg)") | |

pylab.xlim([0, len(cam_rots)]) | |

matplotlib.pyplot.savefig("%s_plot.png" % (NAME)) | |

def get_gyro_rotations(gyro_events, cam_times): | |

"""Get the rotation values of the gyro. | |

Integrates the gyro data between each camera frame to compute an angular | |

displacement. Uses simple Euler approximation to implement the | |

integration. | |

Args: | |

gyro_events: List of gyro event objects. | |

cam_times: Array of N camera times, one for each frame. | |

Returns: | |

Array of N-1 gyro rotation measurements (rad). | |

""" | |

all_times = numpy.array([e["time"] for e in gyro_events]) | |

all_rots = numpy.array([e["z"] for e in gyro_events]) | |

gyro_rots = [] | |

# Integrate the gyro data between each pair of camera frame times. | |

for icam in range(len(cam_times)-1): | |

# Get the window of gyro samples within the current pair of frames. | |

tcam0 = cam_times[icam] | |

tcam1 = cam_times[icam+1] | |

igyrowindow0 = bisect.bisect(all_times, tcam0) | |

igyrowindow1 = bisect.bisect(all_times, tcam1) | |

sgyro = 0 | |

# Integrate samples within the window. | |

for igyro in range(igyrowindow0, igyrowindow1): | |

vgyro0 = all_rots[igyro] | |

vgyro1 = all_rots[igyro+1] | |

tgyro0 = all_times[igyro] | |

tgyro1 = all_times[igyro+1] | |

vgyro = 0.5 * (vgyro0 + vgyro1) | |

deltatgyro = (tgyro1 - tgyro0) * NSEC_TO_SEC | |

sgyro += vgyro * deltatgyro | |

# Handle the fractional intervals at the sides of the window. | |

for side,igyro in enumerate([igyrowindow0-1, igyrowindow1]): | |

vgyro0 = all_rots[igyro] | |

vgyro1 = all_rots[igyro+1] | |

tgyro0 = all_times[igyro] | |

tgyro1 = all_times[igyro+1] | |

vgyro = 0.5 * (vgyro0 + vgyro1) | |

deltatgyro = (tgyro1 - tgyro0) * NSEC_TO_SEC | |

if side == 0: | |

f = (tcam0 - tgyro0) / (tgyro1 - tgyro0) | |

sgyro += vgyro * deltatgyro * (1.0 - f) | |

else: | |

f = (tcam1 - tgyro0) / (tgyro1 - tgyro0) | |

sgyro += vgyro * deltatgyro * f | |

gyro_rots.append(sgyro) | |

gyro_rots = numpy.array(gyro_rots) | |

return gyro_rots | |

def get_cam_rotations(frames): | |

"""Get the rotations of the camera between each pair of frames. | |

Takes N frames and returns N-1 angular displacements corresponding to the | |

rotations between adjacent pairs of frames, in radians. | |

Args: | |

frames: List of N images (as RGB numpy arrays). | |

Returns: | |

Array of N-1 camera rotation measurements (rad). | |

""" | |

gframes = [] | |

for frame in frames: | |

frame = (frame * 255.0).astype(numpy.uint8) | |

gframes.append(cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)) | |

rots = [] | |

for i in range(1,len(gframes)): | |

gframe0 = gframes[i-1] | |

gframe1 = gframes[i] | |

p0 = cv2.goodFeaturesToTrack(gframe0, mask=None, **FEATURE_PARAMS) | |

p1,st,_ = cv2.calcOpticalFlowPyrLK(gframe0, gframe1, p0, None, | |

**LK_PARAMS) | |

tform = procrustes_rotation(p0[st==1], p1[st==1]) | |

# TODO: Choose the sign for the rotation so the cam matches the gyro | |

rot = -math.atan2(tform[0, 1], tform[0, 0]) | |

rots.append(rot) | |

if i == 1: | |

# Save a debug visualization of the features that are being | |

# tracked in the first frame. | |

frame = frames[i] | |

for x,y in p0[st==1]: | |

cv2.circle(frame, (x,y), 3, (100,100,255), -1) | |

its.image.write_image(frame, "%s_features.jpg"%(NAME)) | |

return numpy.array(rots) | |

def get_cam_times(cam_events): | |

"""Get the camera frame times. | |

Args: | |

cam_events: List of (start_exposure, exposure_time, readout_duration) | |

tuples, one per captured frame, with times in nanoseconds. | |

Returns: | |

frame_times: Array of N times, one corresponding to the "middle" of | |

the exposure of each frame. | |

""" | |

# Assign a time to each frame that assumes that the image is instantly | |

# captured in the middle of its exposure. | |

starts = numpy.array([start for start,exptime,readout in cam_events]) | |

exptimes = numpy.array([exptime for start,exptime,readout in cam_events]) | |

readouts = numpy.array([readout for start,exptime,readout in cam_events]) | |

frame_times = starts + (exptimes + readouts) / 2.0 | |

return frame_times | |

def load_data(): | |

"""Load a set of previously captured data. | |

Returns: | |

events: Dictionary containing all gyro events and cam timestamps. | |

frames: List of RGB images as numpy arrays. | |

""" | |

with open("%s_events.txt"%(NAME), "r") as f: | |

events = json.loads(f.read()) | |

n = len(events["cam"]) | |

frames = [] | |

for i in range(n): | |

img = Image.open("%s_frame%03d.jpg"%(NAME,i)) | |

w,h = img.size[0:2] | |

frames.append(numpy.array(img).reshape(h,w,3) / 255.0) | |

return events, frames | |

def collect_data(): | |

"""Capture a new set of data from the device. | |

Captures both motion data and camera frames, while the user is moving | |

the device in a proscribed manner. | |

Returns: | |

events: Dictionary containing all gyro events and cam timestamps. | |

frames: List of RGB images as numpy arrays. | |

""" | |

with its.device.ItsSession() as cam: | |

print "Starting sensor event collection" | |

cam.start_sensor_events() | |

# Sleep a few seconds for gyro events to stabilize. | |

time.sleep(5) | |

# TODO: Ensure that OIS is disabled; set to DISABLE and wait some time. | |

# Capture the frames. | |

props = cam.get_camera_properties() | |

fmt = {"format":"yuv", "width":W, "height":H} | |

s,e,_,_,_ = cam.do_3a(get_results=True) | |

req = its.objects.manual_capture_request(s, e) | |

print "Capturing %dx%d with sens. %d, exp. time %.1fms" % ( | |

W, H, s, e*NSEC_TO_MSEC) | |

caps = cam.do_capture([req]*N, fmt) | |

# Get the gyro events. | |

print "Reading out sensor events" | |

gyro = cam.get_sensor_events()["gyro"] | |

# Combine the events into a single structure. | |

print "Dumping event data" | |

starts = [c["metadata"]["android.sensor.timestamp"] for c in caps] | |

exptimes = [c["metadata"]["android.sensor.exposureTime"] for c in caps] | |

readouts = [c["metadata"]["android.sensor.rollingShutterSkew"] | |

for c in caps] | |

events = {"gyro": gyro, "cam": zip(starts,exptimes,readouts)} | |

with open("%s_events.txt"%(NAME), "w") as f: | |

f.write(json.dumps(events)) | |

# Convert the frames to RGB. | |

print "Dumping frames" | |

frames = [] | |

for i,c in enumerate(caps): | |

img = its.image.convert_capture_to_rgb_image(c) | |

frames.append(img) | |

its.image.write_image(img, "%s_frame%03d.jpg"%(NAME,i)) | |

return events, frames | |

def procrustes_rotation(X, Y): | |

""" | |

Procrustes analysis determines a linear transformation (translation, | |

reflection, orthogonal rotation and scaling) of the points in Y to best | |

conform them to the points in matrix X, using the sum of squared errors | |

as the goodness of fit criterion. | |

Args: | |

X, Y: Matrices of target and input coordinates. | |

Returns: | |

The rotation component of the transformation that maps X to Y. | |

""" | |

X0 = (X-X.mean(0)) / numpy.sqrt(((X-X.mean(0))**2.0).sum()) | |

Y0 = (Y-Y.mean(0)) / numpy.sqrt(((Y-Y.mean(0))**2.0).sum()) | |

U,s,Vt = numpy.linalg.svd(numpy.dot(X0.T, Y0),full_matrices=False) | |

return numpy.dot(Vt.T, U.T) | |

if __name__ == '__main__': | |

main() | |