blob: 4d5a36fc6a6333aead2d0962937f51b88d2ade9a [file] [log] [blame]
# Copyright 2014 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Verify image and inertial sensor events are well synchronized."""
import json
import logging
import math
import multiprocessing
import os
import sys
import time
from matplotlib import pylab
import matplotlib.pyplot
from mobly import test_runner
import numpy as np
import scipy.spatial
import cv2
import its_base_test
import camera_properties_utils
import capture_request_utils
import image_processing_utils
import its_session_utils
import sensor_fusion_utils
_CAM_FRAME_RANGE_MAX = 9.0 # Seconds: max allowed camera frame range.
_FEATURE_MARGIN = 0.20 # Only take feature points from center 20% so that
# rotation measured has less rolling shutter effect.
_FEATURE_PTS_MIN = 30 # Min number of feature pts to perform rotation analysis.
# cv2.goodFeatures to track.
# 'POSTMASK' is the measurement method in all previous versions of Android.
# 'POSTMASK' finds best features on entire frame and then masks the features
# to the vertical center FEATURE_MARGIN for the measurement.
# 'PREMASK' is a new measurement that is used when FEATURE_PTS_MIN is not
# found in frame. This finds the best 2*FEATURE_PTS_MIN in the FEATURE_MARGIN
# part of the frame.
_CV2_FEATURE_PARAMS_POSTMASK = dict(maxCorners=240,
qualityLevel=0.3,
minDistance=7,
blockSize=7)
_CV2_FEATURE_PARAMS_PREMASK = dict(maxCorners=2*_FEATURE_PTS_MIN,
qualityLevel=0.3,
minDistance=7,
blockSize=7)
_GYRO_SAMP_RATE_MIN = 100.0 # Samples/second: min gyro sample rate.
_CV2_LK_PARAMS = dict(winSize=(15, 15),
maxLevel=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT,
10, 0.03)) # cv2.calcOpticalFlowPyrLK params.
_NAME = os.path.splitext(os.path.basename(__file__))[0]
_NUM_ROTATIONS = 10
_START_FRAME = 1
_FRAME_DELTA_TOL = 1.5 # 50% margin over nominal FPS of captures
# Constants to convert between different units (for clarity).
_SEC_TO_MSEC = 1000
_MSEC_TO_NSEC = 1000*1000
_NSEC_TO_SEC = 1.0E-9
_CM_TO_M = 1E-2
_RADS_TO_DEGS = 180/math.pi
# PASS/FAIL thresholds.
_CORR_DIST_THRESH_MAX = 0.005
_OFFSET_MS_THRESH_MAX = 1 # mseconds
_ROTATION_PER_FRAME_MIN = 0.001 # rads/s
# PARAMs from S refactor.
# Set maximum exposure time to reduce motion blur
# blur = velocity * exp_time * N_pixels / FOV
# blur: 3 pixels, v: chart R (17.7cm) / rot_time (1.5s), N: 640, FOV: 30cm
_EXP_MAX = 4*_MSEC_TO_NSEC # 4 ms
_GYRO_INIT_WAIT_TIME = 0.5 # Seconds to wait for gyro to stabilize.
_GYRO_POST_WAIT_TIME = 0.2 # Seconds to wait to capture some extra gyro data.
_IMG_SIZE_MAX = 640 * 480 # Maximum image size.
_NUM_FRAMES_MAX = 300 # fps*test_length should be < this for smooth captures.
_NUM_GYRO_PTS_TO_AVG = 20 # Number of gyroscope events to average.
def _collect_data(cam, fps, w, h, test_length, rot_rig, chart_dist, log_path):
"""Capture a new set of data from the device.
Captures camera frames while the user is moving the device in the proscribed
manner. Note since the capture request is for a manual request, optical
image stabilization (OIS) is disabled.
Args:
cam: camera object.
fps: frames per second capture rate.
w: pixel width of frames.
h: pixel height of frames.
test_length: length of time for test in seconds.
rot_rig: dict with 'cntl' and 'ch' defined.
chart_dist: float value of distance to chart in meters.
log_path: location to save data.
Returns:
frames: list of RGB images as numpy arrays.
"""
logging.debug('Starting sensor event collection')
props = cam.get_camera_properties()
props = cam.override_with_hidden_physical_camera_props(props)
camera_properties_utils.skip_unless(
camera_properties_utils.sensor_fusion_capable(props))
# Start camera rotation.
p = multiprocessing.Process(
target=sensor_fusion_utils.rotation_rig,
args=(rot_rig['cntl'], rot_rig['ch'], _NUM_ROTATIONS,))
p.start()
cam.start_sensor_events()
# Sleep a while for gyro events to stabilize.
time.sleep(_GYRO_INIT_WAIT_TIME)
# Capture frames.
facing = props['android.lens.facing']
if (facing != camera_properties_utils.LENS_FACING_FRONT and
facing != camera_properties_utils.LENS_FACING_BACK):
raise AssertionError(f'Unknown lens facing: {facing}.')
fmt = {'format': 'yuv', 'width': w, 'height': h}
s, e, _, _, _ = cam.do_3a(get_results=True, do_af=False)
logging.debug('3A ISO: %d, exp: %.3fms', s, e/_MSEC_TO_NSEC)
if e > _EXP_MAX:
logging.debug('Re-assigning exposure time')
s *= int(round(e / _EXP_MAX))
e = _EXP_MAX
s_max = props['android.sensor.info.sensitivityRange'][1]
if s > s_max:
logging.debug('Calculated ISO too large! ISO: %d, MAX: %d '
'Setting ISO to max analog gain value.', s, s_max)
s = s_max
req = capture_request_utils.manual_capture_request(s, e)
capture_request_utils.turn_slow_filters_off(props, req)
fd_min = props['android.lens.info.minimumFocusDistance']
fd_chart = 1 / chart_dist
fd_meas = min(fd_min, fd_chart)
logging.debug('fd_min: %.3f, fd_chart: %.3f, fd_meas: %.3f',
fd_min, fd_chart, fd_meas)
req['android.lens.focusDistance'] = fd_meas
req['android.control.aeTargetFpsRange'] = [fps, fps]
req['android.sensor.frameDuration'] = int(1 / _NSEC_TO_SEC / fps)
logging.debug('Capturing %dx%d with ISO %d, exp %.3fms at %dfps',
w, h, s, e/_MSEC_TO_NSEC, fps)
caps = cam.do_capture([req] * int(fps * test_length), fmt)
# Capture a bit more gyro samples for use in get_best_alignment_offset
time.sleep(_GYRO_POST_WAIT_TIME)
# Get the gyro events.
logging.debug('Reading out inertial sensor events')
gyro = cam.get_sensor_events()['gyro']
logging.debug('Number of gyro samples %d', len(gyro))
# Combine the gyro and camera events into a single structure.
logging.debug('Dumping event data')
starts = [cap['metadata']['android.sensor.timestamp'] for cap in caps]
exptimes = [cap['metadata']['android.sensor.exposureTime'] for cap in caps]
readouts = [cap['metadata']['android.sensor.rollingShutterSkew']
for cap in caps]
events = {'gyro': gyro, 'cam': list(zip(starts, exptimes, readouts)),
'facing': facing}
with open('%s_events.txt' % os.path.join(log_path, _NAME), 'w') as f:
f.write(json.dumps(events))
# Convert frames to RGB.
logging.debug('Dumping frames')
frames = []
for i, cap in enumerate(caps):
img = image_processing_utils.convert_capture_to_rgb_image(cap)
frames.append(img)
image_processing_utils.write_image(img, '%s_frame%03d.png' % (
os.path.join(log_path, _NAME), i))
return events, frames
def _plot_gyro_events(gyro_events, log_path):
"""Plot x, y, and z on the gyro events.
Samples are grouped into NUM_GYRO_PTS_TO_AVG groups and averaged to minimize
random spikes in data.
Args:
gyro_events: List of gyroscope events.
log_path: Text to location to save data.
"""
nevents = (len(gyro_events) // _NUM_GYRO_PTS_TO_AVG) * _NUM_GYRO_PTS_TO_AVG
gyro_events = gyro_events[:nevents]
times = np.array([(e['time'] - gyro_events[0]['time']) * _NSEC_TO_SEC
for e in gyro_events])
x = np.array([e['x'] for e in gyro_events])
y = np.array([e['y'] for e in gyro_events])
z = np.array([e['z'] for e in gyro_events])
# Group samples into size-N groups & average each together to minimize random
# spikes in data.
times = times[_NUM_GYRO_PTS_TO_AVG//2::_NUM_GYRO_PTS_TO_AVG]
x = x.reshape(nevents//_NUM_GYRO_PTS_TO_AVG, _NUM_GYRO_PTS_TO_AVG).mean(1)
y = y.reshape(nevents//_NUM_GYRO_PTS_TO_AVG, _NUM_GYRO_PTS_TO_AVG).mean(1)
z = z.reshape(nevents//_NUM_GYRO_PTS_TO_AVG, _NUM_GYRO_PTS_TO_AVG).mean(1)
pylab.figure(_NAME)
# x & y on same axes
pylab.subplot(2, 1, 1)
pylab.title(_NAME + ' (mean of %d pts)' % _NUM_GYRO_PTS_TO_AVG)
pylab.plot(times, x, 'r', label='x')
pylab.plot(times, y, 'g', label='y')
pylab.ylabel('gyro x & y movement (rads/s)')
pylab.legend()
# z on separate axes
pylab.subplot(2, 1, 2)
pylab.plot(times, z, 'b', label='z')
pylab.xlabel('time (seconds)')
pylab.ylabel('gyro z movement (rads/s)')
pylab.legend()
matplotlib.pyplot.savefig(
'%s_gyro_events.png' % (os.path.join(log_path, _NAME)))
def _get_cam_times(cam_events, fps):
"""Get the camera frame times.
Assign a time to each frame. Assumes the image is instantly captured in the
middle of exposure.
Args:
cam_events: List of (start_exposure, exposure_time, readout_duration)
tuples, one per captured frame, with times in nanoseconds.
fps: float of frames per second value
Returns:
frame_times: Array of N times, one corresponding to the 'middle' of the
exposure of each frame.
"""
starts = np.array([start for start, exptime, readout in cam_events])
max_frame_delta_ms = (np.amax(np.subtract(starts[1:], starts[0:-1])) /
_MSEC_TO_NSEC)
logging.debug('Maximum frame delta: %.3f ms', max_frame_delta_ms)
frame_delta_tol_ms = _FRAME_DELTA_TOL * (1 / fps) * _SEC_TO_MSEC
if max_frame_delta_ms > frame_delta_tol_ms:
raise AssertionError(f'Frame drop! Max delta: {max_frame_delta_ms:.3f}ms, '
f'ATOL: {frame_delta_tol_ms}ms')
exptimes = np.array([exptime for start, exptime, readout in cam_events])
if not np.all(exptimes == exptimes[0]):
raise AssertionError(f'Exposure times vary in frames! {exptimes}')
readouts = np.array([readout for start, exptime, readout in cam_events])
if not np.all(readouts == readouts[0]):
raise AssertionError(f'Rolling shutter not always equal! {readouts}')
frame_times = starts + (exptimes + readouts) / 2.0
return frame_times
def _procrustes_rotation(x, y):
"""Performs a Procrustes analysis to conform points in x to y.
Procrustes analysis determines a linear transformation (translation,
reflection, orthogonal rotation and scaling) of the points in y to best
conform them to the points in matrix x, using the sum of squared errors
as the metric for fit criterion.
Args:
x: Target coordinate matrix
y: Input coordinate matrix
Returns:
The rotation component of the transformation that maps x to y.
"""
x0 = (x-x.mean(0)) / np.sqrt(((x-x.mean(0))**2.0).sum())
y0 = (y-y.mean(0)) / np.sqrt(((y-y.mean(0))**2.0).sum())
u, _, vt = np.linalg.svd(np.dot(x0.T, y0), full_matrices=False)
return np.dot(vt.T, u.T)
def _get_cam_rotations(frames, facing, h, log_path):
"""Get the rotations of the camera between each pair of frames.
Takes N frames and returns N-1 angular displacements corresponding to the
rotations between adjacent pairs of frames, in radians.
Only takes feature points from center so that rotation measured has less
rolling shutter effect.
Requires FEATURE_PTS_MIN to have enough data points for accurate measurements.
Uses FEATURE_PARAMS for cv2 to identify features in checkerboard images.
Ensures camera rotates enough.
Args:
frames: List of N images (as RGB numpy arrays).
facing: Direction camera is facing.
h: Pixel height of each frame.
log_path: Location for data.
Returns:
numpy array of N-1 camera rotation measurements (rad).
"""
gframes = []
file_name_stem = os.path.join(log_path, _NAME)
for frame in frames:
frame = (frame * 255.0).astype(np.uint8) # cv2 uses [0, 255]
gframes.append(cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY))
num_frames = len(gframes)
logging.debug('num_frames: %d', num_frames)
# create mask
ymin = int(h * (1 - _FEATURE_MARGIN) / 2)
ymax = int(h * (1 + _FEATURE_MARGIN) / 2)
pre_mask = np.zeros_like(gframes[0])
pre_mask[ymin:ymax, :] = 255
for masking in ['post', 'pre']: # Do post-masking (original) method 1st
logging.debug('Using %s masking method', masking)
rots = []
for i in range(1, num_frames):
j = i - 1
gframe0 = gframes[j]
gframe1 = gframes[i]
if masking == 'post':
p0 = cv2.goodFeaturesToTrack(
gframe0, mask=None, **_CV2_FEATURE_PARAMS_POSTMASK)
post_mask = (p0[:, 0, 1] >= ymin) & (p0[:, 0, 1] <= ymax)
p0_filtered = p0[post_mask]
else:
p0_filtered = cv2.goodFeaturesToTrack(
gframe0, mask=pre_mask, **_CV2_FEATURE_PARAMS_PREMASK)
num_features = len(p0_filtered)
if num_features < _FEATURE_PTS_MIN:
for pt in p0_filtered:
x, y = pt[0][0], pt[0][1]
cv2.circle(frames[j], (x, y), 3, (100, 255, 255), -1)
image_processing_utils.write_image(
frames[j], f'{file_name_stem}_features{j+_START_FRAME:03d}.png')
msg = (f'Not enough features in frame {j+_START_FRAME}. Need at least '
f'{_FEATURE_PTS_MIN} features, got {num_features}.')
if masking == 'pre':
raise AssertionError(msg)
else:
logging.debug(msg)
break
else:
logging.debug('Number of features in frame %s is %d',
str(j+_START_FRAME).zfill(3), num_features)
p1, st, _ = cv2.calcOpticalFlowPyrLK(gframe0, gframe1, p0_filtered, None,
**_CV2_LK_PARAMS)
tform = _procrustes_rotation(p0_filtered[st == 1], p1[st == 1])
if facing == camera_properties_utils.LENS_FACING_BACK:
rot = -math.atan2(tform[0, 1], tform[0, 0])
elif facing == camera_properties_utils.LENS_FACING_FRONT:
rot = math.atan2(tform[0, 1], tform[0, 0])
else:
raise AssertionError(f'Unknown lens facing: {facing}.')
rots.append(rot)
if i == 1:
# Save debug visualization of features that are being
# tracked in the first frame.
frame = frames[j]
for x, y in p0_filtered[st == 1]:
cv2.circle(frame, (x, y), 3, (100, 255, 255), -1)
image_processing_utils.write_image(
frame, f'{file_name_stem}_features{j+_START_FRAME:03d}.png')
if i == num_frames-1:
logging.debug('Correct num of frames found: %d', i)
break # exit if enough features in all frames
if i != num_frames-1:
raise AssertionError('Neither method found enough features in all frames')
rots = np.array(rots)
rot_per_frame_max = max(abs(rots))
logging.debug('Max rotation: %.4f radians', rot_per_frame_max)
if rot_per_frame_max < _ROTATION_PER_FRAME_MIN:
raise AssertionError(f'Device not moved enough: {rot_per_frame_max:.3f} '
f'movement. THRESH: {_ROTATION_PER_FRAME_MIN}.')
return rots
def _plot_best_shift(best, coeff, x, y, log_path):
"""Saves a plot the best offset, fit data and x,y data.
Args:
best: x value of best fit data.
coeff: 3 element np array. Return of np.polyfit(x,y) for 2nd order fit.
x: np array of x data that was fit.
y: np array of y data that was fit.
log_path: where to store data.
"""
xfit = np.arange(x[0], x[-1], 0.05).tolist()
yfit = [coeff[0]*x*x + coeff[1]*x + coeff[2] for x in xfit]
pylab.figure()
pylab.title(f'{_NAME} Gyro/Camera Time Correlation')
pylab.plot(x, y, 'ro', label='data', alpha=0.7)
pylab.plot(xfit, yfit, 'b', label='fit', alpha=0.7)
pylab.plot(best, min(yfit), 'g*', label='best', markersize=10)
pylab.ticklabel_format(axis='y', style='sci', scilimits=(-3, -3))
pylab.xlabel('Relative horizontal shift between curves (ms)')
pylab.ylabel('Correlation distance')
pylab.legend()
matplotlib.pyplot.savefig(
'%s_plot_shifts.png' % os.path.join(log_path, _NAME))
def _plot_rotations(cam_rots, gyro_rots, log_path):
"""Saves a plot of the camera vs. gyro rotational measurements.
Args:
cam_rots: Array of camera rotation measurements (rads).
gyro_rots: Array of gyro rotation measurements (rads).
log_path: Location to store data.
"""
# For plot, scale rotations to degrees.
pylab.figure()
pylab.title(f'{_NAME} Gyro/Camera Rotations')
pylab.plot(range(len(cam_rots)), cam_rots*_RADS_TO_DEGS, '-r.',
label='camera', alpha=0.7)
pylab.plot(range(len(gyro_rots)), gyro_rots*_RADS_TO_DEGS, '-b.',
label='gyro', alpha=0.7)
pylab.xlabel('Camera frame number')
pylab.ylabel('Angular displacement between adjacent camera frames (degrees)')
pylab.xlim([0, len(cam_rots)])
pylab.legend()
matplotlib.pyplot.savefig(
'%s_plot_rotations.png' % os.path.join(log_path, _NAME))
def load_data():
"""Load a set of previously captured data.
Returns:
events: Dictionary containing all gyro events and cam timestamps.
frames: List of RGB images as numpy arrays.
w: Pixel width of frames
h: Pixel height of frames
"""
with open(f'{_NAME}_events.txt', 'r') as f:
events = json.loads(f.read())
n = len(events['cam'])
frames = []
for i in range(n):
img = image_processing_utils.read_image(f'{_NAME}_frame{i:03d}.png')
w, h = img.size[0:2]
frames.append(np.array(img).reshape((h, w, 3)) / 255)
return events, frames, w, h
class SensorFusionTest(its_base_test.ItsBaseTest):
"""Tests if image and motion sensor events are well synchronized.
Tests gyro and camera timestamp differences while camera is rotating.
Test description is in SensorFusion.pdf file. Test rotates phone in proscribed
manner and captures images.
Camera rotation is determined from images and from gyroscope events.
Timestamp offset between gyro and camera is determined using scipy
spacial correlation distance. The min value is determined as the optimum.
PASS/FAIL based on the offset and also the correlation distance.
"""
def _assert_gyro_encompasses_camera(self, cam_times, gyro_times):
"""Confirms the camera events are bounded by the gyroscope events.
Also ensures:
1. Camera frame range is less than MAX_CAMERA_FRAME_RANGE. When camera
frame range is significantly larger than spec, the system is usually in a
busy/bad state.
2. Gyro samples per second are greater than GYRO_SAMP_RATE_MIN
Args:
cam_times: numpy array of camera times.
gyro_times: List of 'gyro' times.
"""
min_cam_time = min(cam_times) * _NSEC_TO_SEC
max_cam_time = max(cam_times) * _NSEC_TO_SEC
min_gyro_time = min(gyro_times) * _NSEC_TO_SEC
max_gyro_time = max(gyro_times) * _NSEC_TO_SEC
if not (min_cam_time > min_gyro_time and max_cam_time < max_gyro_time):
raise AssertionError(
f'Camera timestamps [{min_cam_time}, {max_cam_time}] not '
f'enclosed by gyro timestamps [{min_gyro_time}, {max_gyro_time}]')
cam_frame_range = max_cam_time - min_cam_time
logging.debug('Camera frame range: %f', cam_frame_range)
gyro_time_range = max_gyro_time - min_gyro_time
gyro_smp_per_sec = len(gyro_times) / gyro_time_range
logging.debug('Gyro samples per second: %f', gyro_smp_per_sec)
if cam_frame_range > _CAM_FRAME_RANGE_MAX:
raise AssertionError(f'Camera frame range, {cam_frame_range}s, too high!')
if gyro_smp_per_sec < _GYRO_SAMP_RATE_MIN:
raise AssertionError(f'Gyro sample rate, {gyro_smp_per_sec}S/s, low!')
def test_sensor_fusion(self):
# Determine if '--replay' in cmd line
replay = False
for s in list(sys.argv[1:]):
if '--replay' in s:
replay = True
logging.info('test_sensor_fusion.py not run. Using existing data set.')
rot_rig = {}
fps = float(self.fps)
img_w, img_h = self.img_w, self.img_h
test_length = float(self.test_length)
log_path = self.log_path
chart_distance = self.chart_distance * _CM_TO_M
if img_w * img_h > _IMG_SIZE_MAX or fps * test_length > _NUM_FRAMES_MAX:
logging.debug(
'Warning: Your test parameters may require fast write speeds'
' to run smoothly. If you run into problems, consider'
" smaller values of 'w', 'h', 'fps', or 'test_length'.")
if replay:
events, frames, _, h = load_data()
else:
with its_session_utils.ItsSession(
device_id=self.dut.serial,
camera_id=self.camera_id,
hidden_physical_id=self.hidden_physical_id) as cam:
rot_rig['cntl'] = self.rotator_cntl
rot_rig['ch'] = self.rotator_ch
events, frames = _collect_data(cam, fps, img_w, img_h, test_length,
rot_rig, chart_distance, log_path)
logging.debug('Start frame: %d', _START_FRAME)
_plot_gyro_events(events['gyro'], log_path)
# Validity check on gyro/camera timestamps
cam_times = _get_cam_times(
events['cam'][_START_FRAME:len(events['cam'])], fps)
gyro_times = [e['time'] for e in events['gyro']]
self._assert_gyro_encompasses_camera(cam_times, gyro_times)
# Compute cam rotation displacement(rads) between pairs of adjacent frames.
cam_rots = _get_cam_rotations(
frames[_START_FRAME:len(frames)], events['facing'], img_h, log_path)
logging.debug('cam_rots: %s', str(cam_rots))
gyro_rots = sensor_fusion_utils.get_gyro_rotations(
events['gyro'], cam_times)
_plot_rotations(cam_rots, gyro_rots, log_path)
# Find the best offset.
offset_ms, coeffs, candidates, distances = sensor_fusion_utils.get_best_alignment_offset(
cam_times, cam_rots, events['gyro'])
_plot_best_shift(offset_ms, coeffs, candidates, distances, log_path)
# Calculate correlation distance with best offset.
corr_dist = scipy.spatial.distance.correlation(cam_rots, gyro_rots)
logging.debug('Best correlation of %f at shift of %.3fms',
corr_dist, offset_ms)
# Assert PASS/FAIL criteria.
if corr_dist > _CORR_DIST_THRESH_MAX:
raise AssertionError(f'Poor gyro/camera correlation: {corr_dist:.6f}, '
f'TOL: {_CORR_DIST_THRESH_MAX}.')
if abs(offset_ms) > _OFFSET_MS_THRESH_MAX:
raise AssertionError('Offset too large. Measured (ms): '
f'{offset_ms:.3f}, TOL: {_OFFSET_MS_THRESH_MAX}.')
if __name__ == '__main__':
test_runner.main()