blob: e1c50d18aaac1c85513ecc0527431235e7c860a9 [file] [log] [blame] [edit]
# Copyright 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions for sensor_fusion hardware rig."""
import bisect
import codecs
import logging
import math
import os
import struct
import time
import cv2
import matplotlib
from matplotlib import pylab
import matplotlib.pyplot
import numpy as np
import scipy.spatial
import serial
from serial.tools import list_ports
import camera_properties_utils
import image_processing_utils
matplotlib.use('agg') # Must be executed before any figure is created.
# Constants for Rotation Rig
ARDUINO_ANGLE_MAX = 180.0 # degrees
ARDUINO_ANGLES_SENSOR_FUSION = (0, 90) # degrees
ARDUINO_ANGLES_STABILIZATION = (10, 25) # degrees
ARDUINO_BAUDRATE = 9600
ARDUINO_CMD_LENGTH = 3
ARDUINO_CMD_TIME = 2.0 * ARDUINO_CMD_LENGTH / ARDUINO_BAUDRATE # round trip
ARDUINO_MOVE_TIME_SENSOR_FUSION = 2 # seconds
ARDUINO_MOVE_TIME_STABILIZATION = 0.3 # seconds
ARDUINO_PID = 0x0043
ARDUINO_SERVO_SPEED_MAX = 255
ARDUINO_SERVO_SPEED_MIN = 1
ARDUINO_SERVO_SPEED_SENSOR_FUSION = 20
ARDUINO_SERVO_SPEED_STABILIZATION = 10
ARDUINO_SERVO_SPEED_STABILIZATION_TABLET = 20
ARDUINO_SPEED_START_BYTE = 253
ARDUINO_START_BYTE = 255
ARDUINO_START_NUM_TRYS = 5
ARDUINO_START_TIMEOUT = 300 # seconds
ARDUINO_STRING = 'Arduino'
ARDUINO_TEST_CMD = (b'\x01', b'\x02', b'\x03')
ARDUINO_VALID_CH = ('1', '2', '3', '4', '5', '6')
ARDUINO_VIDS = (0x2341, 0x2a03)
CANAKIT_BAUDRATE = 115200
CANAKIT_CMD_TIME = 0.05 # seconds (found experimentally)
CANAKIT_DATA_DELIMITER = '\r\n'
CANAKIT_PID = 0xfc73
CANAKIT_SEND_TIMEOUT = 0.02 # seconds
CANAKIT_SET_CMD = 'REL'
CANAKIT_SLEEP_TIME = 2 # seconds (for full 90 degree rotation)
CANAKIT_VALID_CMD = ('ON', 'OFF')
CANAKIT_VALID_CH = ('1', '2', '3', '4')
CANAKIT_VID = 0x04d8
HS755HB_ANGLE_MAX = 202.0 # throw for rotation motor in degrees
# From test_sensor_fusion
_FEATURE_MARGIN = 0.20 # Only take feature points from center 20% so that
# rotation measured has less rolling shutter effect.
_FEATURE_PTS_MIN = 30 # Min number of feature pts to perform rotation analysis.
# cv2.goodFeatures to track.
# 'POSTMASK' is the measurement method in all previous versions of Android.
# 'POSTMASK' finds best features on entire frame and then masks the features
# to the vertical center FEATURE_MARGIN for the measurement.
# 'PREMASK' is a new measurement that is used when FEATURE_PTS_MIN is not
# found in frame. This finds the best 2*FEATURE_PTS_MIN in the FEATURE_MARGIN
# part of the frame.
_CV2_FEATURE_PARAMS_POSTMASK = dict(maxCorners=240,
qualityLevel=0.3,
minDistance=7,
blockSize=7)
_CV2_FEATURE_PARAMS_PREMASK = dict(maxCorners=2*_FEATURE_PTS_MIN,
qualityLevel=0.3,
minDistance=7,
blockSize=7)
_GYRO_SAMP_RATE_MIN = 100.0 # Samples/second: min gyro sample rate.
_CV2_LK_PARAMS = dict(winSize=(15, 15),
maxLevel=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT,
10, 0.03)) # cv2.calcOpticalFlowPyrLK params.
_ROTATION_PER_FRAME_MIN = 0.001 # rads/s
_GYRO_ROTATION_PER_SEC_MAX = 2.0 # rads/s
_R_SQUARED_TOLERANCE = 0.01 # tolerance for polynomial fitting r^2
_SHIFT_DOMAIN_RADIUS = 5 # limited domain centered around best shift
# unittest constants
_COARSE_FIT_RANGE = 20 # Range area around coarse fit to do optimization.
_CORR_TIME_OFFSET_MAX = 50 # ms max shift to try and match camera/gyro times.
_CORR_TIME_OFFSET_STEP = 0.5 # ms step for shifts.
# Unit translators
_MSEC_TO_NSEC = 1000000
_NSEC_TO_SEC = 1E-9
_SEC_TO_NSEC = int(1/_NSEC_TO_SEC)
_RADS_TO_DEGS = 180/math.pi
_NUM_GYRO_PTS_TO_AVG = 20
def polynomial_from_coefficients(coefficients):
"""Return a polynomial function from a coefficient list, highest power first.
Args:
coefficients: list of coefficients (float)
Returns:
Function in the form of a*x^n + b*x^(n - 1) + ... + constant
"""
def polynomial(x):
n = len(coefficients)
return sum(coefficients[i] * x ** (n - i - 1) for i in range(n))
return polynomial
def smallest_absolute_minimum_of_polynomial(coefficients):
"""Return the smallest minimum by absolute value from a coefficient list.
Args:
coefficients: list of coefficients (float)
Returns:
Smallest local minimum (by absolute value) on the function (float)
"""
first_derivative = np.polyder(coefficients, m=1)
second_derivative = np.polyder(coefficients, m=2)
extrema = np.roots(first_derivative)
smallest_absolute_minimum = None
for extremum in extrema:
if np.polyval(second_derivative, extremum) > 0:
if smallest_absolute_minimum is None or abs(extremum) < abs(
smallest_absolute_minimum):
smallest_absolute_minimum = extremum
if smallest_absolute_minimum is None:
raise AssertionError(
f'No minima were found on function described by {coefficients}.')
return smallest_absolute_minimum
def serial_port_def(name):
"""Determine the serial port and open.
Args:
name: string of device to locate (ie. 'Arduino', 'Canakit' or 'Default')
Returns:
serial port object
"""
serial_port = None
devices = list_ports.comports()
for device in devices:
if not (device.vid and device.pid): # Not all comm ports have vid and pid
continue
if name.lower() == 'arduino':
if (device.vid in ARDUINO_VIDS and device.pid == ARDUINO_PID):
logging.debug('Arduino: %s', str(device))
serial_port = device.device
return serial.Serial(serial_port, ARDUINO_BAUDRATE, timeout=1)
elif name.lower() in ('canakit', 'default'):
if (device.vid == CANAKIT_VID and device.pid == CANAKIT_PID):
logging.debug('Canakit: %s', str(device))
serial_port = device.device
return serial.Serial(serial_port, CANAKIT_BAUDRATE,
timeout=CANAKIT_SEND_TIMEOUT,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS)
raise ValueError(f'{name} device not connected.')
def canakit_cmd_send(canakit_serial_port, cmd_str):
"""Wrapper for sending serial command to Canakit.
Args:
canakit_serial_port: port to write for canakit
cmd_str: str; value to send to device.
"""
try:
logging.debug('writing port...')
canakit_serial_port.write(CANAKIT_DATA_DELIMITER.encode())
time.sleep(CANAKIT_CMD_TIME) # This is critical for relay.
canakit_serial_port.write(cmd_str.encode())
except IOError as io_error:
raise IOError(
f'Port {CANAKIT_VID}:{CANAKIT_PID} is not open!') from io_error
def canakit_set_relay_channel_state(canakit_port, ch, state):
"""Set Canakit relay channel and state.
Waits CANAKIT_SLEEP_TIME for rotation to occur.
Args:
canakit_port: serial port object for the Canakit port.
ch: string for channel number of relay to set. '1', '2', '3', or '4'
state: string of either 'ON' or 'OFF'
"""
logging.debug('Setting relay state %s', state)
if ch in CANAKIT_VALID_CH and state in CANAKIT_VALID_CMD:
canakit_cmd_send(canakit_port, CANAKIT_SET_CMD + ch + '.' + state + '\r\n')
time.sleep(CANAKIT_SLEEP_TIME)
else:
logging.debug('Invalid ch (%s) or state (%s), no command sent.', ch, state)
def arduino_read_cmd(port):
"""Read back Arduino command from serial port."""
cmd = []
for _ in range(ARDUINO_CMD_LENGTH):
cmd.append(port.read())
return cmd
def arduino_send_cmd(port, cmd):
"""Send command to serial port."""
for i in range(ARDUINO_CMD_LENGTH):
port.write(cmd[i])
def arduino_loopback_cmd(port, cmd):
"""Send command to serial port."""
arduino_send_cmd(port, cmd)
time.sleep(ARDUINO_CMD_TIME)
return arduino_read_cmd(port)
def establish_serial_comm(port):
"""Establish connection with serial port."""
logging.debug('Establishing communication with %s', port.name)
trys = 1
hex_test = convert_to_hex(ARDUINO_TEST_CMD)
logging.debug(' test tx: %s %s %s', hex_test[0], hex_test[1], hex_test[2])
start = time.time()
while time.time() < start + ARDUINO_START_TIMEOUT:
try:
cmd_read = arduino_loopback_cmd(port, ARDUINO_TEST_CMD)
except serial.serialutil.SerialException as _:
logging.debug('Port in use, trying again...')
continue
hex_read = convert_to_hex(cmd_read)
logging.debug(' test rx: %s %s %s', hex_read[0], hex_read[1], hex_read[2])
if cmd_read != list(ARDUINO_TEST_CMD):
trys += 1
else:
logging.debug(' Arduino comm established after %d try(s)', trys)
break
else:
raise AssertionError(f'Arduino comm not established after {trys} tries '
f'and {ARDUINO_START_TIMEOUT} seconds')
def convert_to_hex(cmd):
return [('%0.2x' % int(codecs.encode(x, 'hex_codec'), 16) if x else '--')
for x in cmd]
def arduino_rotate_servo_to_angle(ch, angle, serial_port, move_time):
"""Rotate servo to the specified angle.
Args:
ch: str; servo to rotate in ARDUINO_VALID_CH
angle: int; servo angle to move to
serial_port: object; serial port
move_time: int; time in seconds
"""
if angle < 0 or angle > ARDUINO_ANGLE_MAX:
logging.debug('Angle must be between 0 and %d.', ARDUINO_ANGLE_MAX)
angle = 0
if angle > ARDUINO_ANGLE_MAX:
angle = ARDUINO_ANGLE_MAX
cmd = [struct.pack('B', i) for i in [ARDUINO_START_BYTE, int(ch), angle]]
arduino_send_cmd(serial_port, cmd)
time.sleep(move_time)
def arduino_rotate_servo(ch, angles, move_time, serial_port):
"""Rotate servo through 'angles'.
Args:
ch: str; servo to rotate
angles: list of ints; servo angles to move to
move_time: int; time required to allow for arduino movement
serial_port: object; serial port
"""
for angle in angles:
angle_norm = int(round(angle*ARDUINO_ANGLE_MAX/HS755HB_ANGLE_MAX, 0))
arduino_rotate_servo_to_angle(ch, angle_norm, serial_port, move_time)
def rotation_rig(rotate_cntl, rotate_ch, num_rotations, angles, servo_speed,
move_time, arduino_serial_port):
"""Rotate the phone n times using rotate_cntl and rotate_ch defined.
rotate_ch is hard wired and must be determined from physical setup.
If using Arduino, serial port must be initialized and communication must be
established before rotation.
Args:
rotate_cntl: str to identify 'arduino', 'canakit' or 'external' controller.
rotate_ch: str to identify rotation channel number.
num_rotations: int number of rotations.
angles: list of ints; servo angle to move to.
servo_speed: int number of move speed between [1, 255].
move_time: int time required to allow for arduino movement.
arduino_serial_port: optional initialized serial port object
"""
logging.debug('Controller: %s, ch: %s', rotate_cntl, rotate_ch)
if arduino_serial_port:
# initialize servo at origin
logging.debug('Moving servo to origin')
arduino_rotate_servo_to_angle(rotate_ch, 0, arduino_serial_port, 1)
# set servo speed
set_servo_speed(rotate_ch, servo_speed, arduino_serial_port, delay=0)
elif rotate_cntl.lower() == 'canakit':
canakit_serial_port = serial_port_def('Canakit')
elif rotate_cntl.lower() == 'external':
logging.info('External rotation control.')
else:
logging.info('No rotation rig defined. Manual test: rotate phone by hand.')
# rotate phone
logging.debug('Rotating phone %dx', num_rotations)
for _ in range(num_rotations):
if rotate_cntl == 'arduino':
arduino_rotate_servo(rotate_ch, angles, move_time, arduino_serial_port)
elif rotate_cntl == 'canakit':
canakit_set_relay_channel_state(canakit_serial_port, rotate_ch, 'ON')
canakit_set_relay_channel_state(canakit_serial_port, rotate_ch, 'OFF')
logging.debug('Finished rotations')
if rotate_cntl == 'arduino':
logging.debug('Moving servo to origin')
arduino_rotate_servo_to_angle(rotate_ch, 0, arduino_serial_port, 1)
def set_servo_speed(ch, servo_speed, serial_port, delay=0):
"""Set servo to specified speed.
Args:
ch: str; servo to turn on in ARDUINO_VALID_CH
servo_speed: int; value of speed between 1 and 255
serial_port: object; serial port
delay: int; time in seconds
"""
logging.debug('Servo speed: %d', servo_speed)
if servo_speed < ARDUINO_SERVO_SPEED_MIN:
logging.debug('Servo speed must be >= %d.', ARDUINO_SERVO_SPEED_MIN)
servo_speed = ARDUINO_SERVO_SPEED_MIN
elif servo_speed > ARDUINO_SERVO_SPEED_MAX:
logging.debug('Servo speed must be <= %d.', ARDUINO_SERVO_SPEED_MAX)
servo_speed = ARDUINO_SERVO_SPEED_MAX
cmd = [struct.pack('B', i) for i in [ARDUINO_SPEED_START_BYTE,
int(ch), servo_speed]]
arduino_send_cmd(serial_port, cmd)
time.sleep(delay)
def calc_max_rotation_angle(rotations, sensor_type):
"""Calculates the max angle of deflection from rotations.
Args:
rotations: numpy array of rotation per event
sensor_type: string 'Camera' or 'Gyro'
Returns:
maximum angle of rotation for the given rotations
"""
rotations *= _RADS_TO_DEGS
rotations_sum = np.cumsum(rotations)
rotation_max = max(rotations_sum)
rotation_min = min(rotations_sum)
logging.debug('%s min: %.2f, max %.2f rotation (degrees)',
sensor_type, rotation_min, rotation_max)
logging.debug('%s max rotation: %.2f degrees',
sensor_type, (rotation_max-rotation_min))
return rotation_max-rotation_min
def get_gyro_rotations(gyro_events, cam_times):
"""Get the rotation values of the gyro.
Integrates the gyro data between each camera frame to compute an angular
displacement.
Args:
gyro_events: List of gyro event objects.
cam_times: Array of N camera times, one for each frame.
Returns:
Array of N-1 gyro rotation measurements (rads/s).
"""
gyro_times = np.array([e['time'] for e in gyro_events])
all_gyro_rots = np.array([e['z'] for e in gyro_events])
gyro_rots = []
if gyro_times[0] > cam_times[0] or gyro_times[-1] < cam_times[-1]:
raise AssertionError('Gyro times do not bound camera times! '
f'gyro: {gyro_times[0]:.0f} -> {gyro_times[-1]:.0f} '
f'cam: {cam_times[0]} -> {cam_times[-1]} (ns).')
# Integrate the gyro data between each pair of camera frame times.
for i_cam in range(len(cam_times)-1):
# Get the window of gyro samples within the current pair of frames.
# Note: bisect always picks first gyro index after camera time.
t_cam0 = cam_times[i_cam]
t_cam1 = cam_times[i_cam+1]
i_gyro_window0 = bisect.bisect(gyro_times, t_cam0)
i_gyro_window1 = bisect.bisect(gyro_times, t_cam1)
gyro_sum = 0
# Integrate samples within the window.
for i_gyro in range(i_gyro_window0, i_gyro_window1):
gyro_val = all_gyro_rots[i_gyro+1]
t_gyro0 = gyro_times[i_gyro]
t_gyro1 = gyro_times[i_gyro+1]
t_gyro_delta = (t_gyro1 - t_gyro0) * _NSEC_TO_SEC
gyro_sum += gyro_val * t_gyro_delta
# Handle the fractional intervals at the sides of the window.
for side, i_gyro in enumerate([i_gyro_window0-1, i_gyro_window1]):
gyro_val = all_gyro_rots[i_gyro+1]
t_gyro0 = gyro_times[i_gyro]
t_gyro1 = gyro_times[i_gyro+1]
t_gyro_delta = (t_gyro1 - t_gyro0) * _NSEC_TO_SEC
if side == 0:
f = (t_cam0 - t_gyro0) / (t_gyro1 - t_gyro0)
frac_correction = gyro_val * t_gyro_delta * (1.0 - f)
gyro_sum += frac_correction
else:
f = (t_cam1 - t_gyro0) / (t_gyro1 - t_gyro0)
frac_correction = gyro_val * t_gyro_delta * f
gyro_sum += frac_correction
gyro_rots.append(gyro_sum)
gyro_rots = np.array(gyro_rots)
return gyro_rots
def procrustes_rotation(x, y):
"""Performs a Procrustes analysis to conform points in x to y.
Procrustes analysis determines a linear transformation (translation,
reflection, orthogonal rotation and scaling) of the points in y to best
conform them to the points in matrix x, using the sum of squared errors
as the metric for fit criterion.
Args:
x: Target coordinate matrix
y: Input coordinate matrix
Returns:
The rotation component of the transformation that maps x to y.
"""
x0 = (x-x.mean(0)) / np.sqrt(((x-x.mean(0))**2.0).sum())
y0 = (y-y.mean(0)) / np.sqrt(((y-y.mean(0))**2.0).sum())
u, _, vt = np.linalg.svd(np.dot(x0.T, y0), full_matrices=False)
return np.dot(vt.T, u.T)
def get_cam_rotations(frames, facing, h, file_name_stem,
start_frame, stabilized_video=False):
"""Get the rotations of the camera between each pair of frames.
Takes N frames and returns N-1 angular displacements corresponding to the
rotations between adjacent pairs of frames, in radians.
Only takes feature points from center so that rotation measured has less
rolling shutter effect.
Requires FEATURE_PTS_MIN to have enough data points for accurate measurements.
Uses FEATURE_PARAMS for cv2 to identify features in checkerboard images.
Ensures camera rotates enough if not calling with stabilized video.
Args:
frames: List of N images (as RGB numpy arrays).
facing: Direction camera is facing.
h: Pixel height of each frame.
file_name_stem: file name stem including location for data.
start_frame: int; index to start at
stabilized_video: Boolean; if called with stabilized video
Returns:
numpy array of N-1 camera rotation measurements (rad).
"""
gframes = []
for frame in frames:
frame = (frame * 255.0).astype(np.uint8) # cv2 uses [0, 255]
gframes.append(cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY))
num_frames = len(gframes)
logging.debug('num_frames: %d', num_frames)
# create mask
ymin = int(h * (1 - _FEATURE_MARGIN) / 2)
ymax = int(h * (1 + _FEATURE_MARGIN) / 2)
pre_mask = np.zeros_like(gframes[0])
pre_mask[ymin:ymax, :] = 255
for masking in ['post', 'pre']: # Do post-masking (original) method 1st
logging.debug('Using %s masking method', masking)
rotations = []
for i in range(1, num_frames):
j = i - 1
gframe0 = gframes[j]
gframe1 = gframes[i]
if masking == 'post':
p0 = cv2.goodFeaturesToTrack(
gframe0, mask=None, **_CV2_FEATURE_PARAMS_POSTMASK)
post_mask = (p0[:, 0, 1] >= ymin) & (p0[:, 0, 1] <= ymax)
p0_filtered = p0[post_mask]
else:
p0_filtered = cv2.goodFeaturesToTrack(
gframe0, mask=pre_mask, **_CV2_FEATURE_PARAMS_PREMASK)
num_features = len(p0_filtered)
if num_features < _FEATURE_PTS_MIN:
for pt in np.rint(p0_filtered).astype(int):
x, y = pt[0][0], pt[0][1]
cv2.circle(frames[j], (x, y), 3, (100, 255, 255), -1)
image_processing_utils.write_image(
frames[j], f'{file_name_stem}_features{j+start_frame:03d}.png')
msg = (f'Not enough features in frame {j+start_frame}. Need at least '
f'{_FEATURE_PTS_MIN} features, got {num_features}.')
if masking == 'pre':
raise AssertionError(msg)
else:
logging.debug(msg)
break
else:
logging.debug('Number of features in frame %s is %d',
str(j+start_frame).zfill(3), num_features)
p1, st, _ = cv2.calcOpticalFlowPyrLK(gframe0, gframe1, p0_filtered, None,
**_CV2_LK_PARAMS)
tform = procrustes_rotation(p0_filtered[st == 1], p1[st == 1])
if facing == camera_properties_utils.LENS_FACING['BACK']:
rotation = -math.atan2(tform[0, 1], tform[0, 0])
elif facing == camera_properties_utils.LENS_FACING['FRONT']:
rotation = math.atan2(tform[0, 1], tform[0, 0])
else:
raise AssertionError(f'Unknown lens facing: {facing}.')
rotations.append(rotation)
if i == 1:
# Save debug visualization of features that are being
# tracked in the first frame.
frame = frames[j]
for x, y in np.rint(p0_filtered[st == 1]).astype(int):
cv2.circle(frame, (x, y), 3, (100, 255, 255), -1)
image_processing_utils.write_image(
frame, f'{file_name_stem}_features{j+start_frame:03d}.png')
if i == num_frames-1:
logging.debug('Correct num of frames found: %d', i)
break # exit if enough features in all frames
if i != num_frames-1:
raise AssertionError('Neither method found enough features in all frames')
rotations = np.array(rotations)
rot_per_frame_max = max(abs(rotations))
logging.debug('Max rotation in frame: %.2f degrees',
rot_per_frame_max*_RADS_TO_DEGS)
if stabilized_video:
logging.debug('Skipped camera rotation check due to stabilized video.')
else:
if rot_per_frame_max < _ROTATION_PER_FRAME_MIN:
raise AssertionError(f'Device not moved enough: {rot_per_frame_max:.3f} '
f'movement. THRESH: {_ROTATION_PER_FRAME_MIN} rads.')
else:
logging.debug('Device movement exceeds %.2f degrees',
_ROTATION_PER_FRAME_MIN*_RADS_TO_DEGS)
return rotations
def get_best_alignment_offset(cam_times, cam_rots, gyro_events, degree=2):
"""Find the best offset to align the camera and gyro motion traces.
This function integrates the shifted gyro data between camera samples
for a range of candidate shift values, and returns the shift that
result in the best correlation.
Uses a correlation distance metric between the curves, where a smaller
value means that the curves are better-correlated.
Fits a curve to the correlation distance data to measure the minima more
accurately, by looking at the correlation distances within a range of
+/- 10ms from the measured best score; note that this will use fewer
than the full +/- 10 range for the curve fit if the measured score
(which is used as the center of the fit) is within 10ms of the edge of
the +/- 50ms candidate range.
Args:
cam_times: Array of N camera times, one for each frame.
cam_rots: Array of N-1 camera rotation displacements (rad).
gyro_events: List of gyro event objects.
degree: Degree of polynomial
Returns:
Best alignment offset(ms), fit coefficients, candidates, and distances.
"""
# Measure the correlation distance over defined shift
shift_candidates = np.arange(-_CORR_TIME_OFFSET_MAX,
_CORR_TIME_OFFSET_MAX+_CORR_TIME_OFFSET_STEP,
_CORR_TIME_OFFSET_STEP).tolist()
spatial_distances = []
for shift in shift_candidates:
shifted_cam_times = cam_times + shift*_MSEC_TO_NSEC
gyro_rots = get_gyro_rotations(gyro_events, shifted_cam_times)
spatial_distance = scipy.spatial.distance.correlation(cam_rots, gyro_rots)
logging.debug('shift %.1fms spatial distance: %.5f', shift,
spatial_distance)
spatial_distances.append(spatial_distance)
best_corr_dist = min(spatial_distances)
coarse_best_shift = shift_candidates[spatial_distances.index(best_corr_dist)]
logging.debug('Best shift without fitting is %.4f ms', coarse_best_shift)
# Fit a polynomial around coarse_best_shift to extract best fit
i = spatial_distances.index(best_corr_dist)
i_poly_fit_min = i - _COARSE_FIT_RANGE
i_poly_fit_max = i + _COARSE_FIT_RANGE + 1
shift_candidates = shift_candidates[i_poly_fit_min:i_poly_fit_max]
spatial_distances = spatial_distances[i_poly_fit_min:i_poly_fit_max]
logging.debug('Polynomial degree: %d', degree)
fit_coeffs, residuals, _, _, _ = np.polyfit(
shift_candidates, spatial_distances, degree, full=True
)
logging.debug('Fit coefficients: %s', fit_coeffs)
logging.debug('Residuals: %s', residuals)
total_sum_of_squares = np.sum(
(spatial_distances - np.mean(spatial_distances)) ** 2
)
# Calculate r-squared on the entire domain for debugging
r_squared = 1 - residuals[0] / total_sum_of_squares
logging.debug('r^2 on the entire domain: %f', r_squared)
# Calculate r-squared near the best shift
domain_around_best_shift = [coarse_best_shift - _SHIFT_DOMAIN_RADIUS,
coarse_best_shift + _SHIFT_DOMAIN_RADIUS]
logging.debug('Calculating r^2 on the limited domain of [%f, %f]',
domain_around_best_shift[0], domain_around_best_shift[1])
small_shifts_and_distances = [
(x, y)
for x, y in zip(shift_candidates, spatial_distances)
if domain_around_best_shift[0] <= x <= domain_around_best_shift[1]
]
small_shift_candidates, small_spatial_distances = zip(
*small_shifts_and_distances
)
logging.debug('Shift candidates on limited domain: %s',
small_shift_candidates)
logging.debug('Spatial distances on limited domain: %s',
small_spatial_distances)
limited_residuals = np.sum(
(np.polyval(fit_coeffs, small_shift_candidates) - small_spatial_distances)
** 2
)
logging.debug('Residuals on limited domain: %s', limited_residuals)
limited_total_sum_of_squares = np.sum(
(small_spatial_distances - np.mean(small_spatial_distances)) ** 2
)
limited_r_squared = 1 - limited_residuals / limited_total_sum_of_squares
logging.debug('r^2 on limited domain: %f', limited_r_squared)
# Calculate exact_best_shift (x where y is minimum of parabola)
exact_best_shift = smallest_absolute_minimum_of_polynomial(fit_coeffs)
if abs(coarse_best_shift - exact_best_shift) > 2.0:
raise AssertionError(
f'Test failed. Bad fit to time-shift curve. Coarse best shift: '
f'{coarse_best_shift}, Exact best shift: {exact_best_shift}.')
# Check fit of polynomial near the best shift
if not math.isclose(limited_r_squared, 1, abs_tol=_R_SQUARED_TOLERANCE):
logging.debug('r-squared on domain [%f, %f] was %f, expected 1.0, '
'ATOL: %f',
domain_around_best_shift[0], domain_around_best_shift[1],
limited_r_squared, _R_SQUARED_TOLERANCE)
return None
return exact_best_shift, fit_coeffs, shift_candidates, spatial_distances
def plot_camera_rotations(cam_rots, start_frame, video_quality,
plot_name_stem):
"""Plot the camera rotations.
Args:
cam_rots: np array of camera rotations angle per frame
start_frame: int value of start frame
video_quality: str for video quality identifier
plot_name_stem: str (with path) of what to call plot
"""
pylab.figure(video_quality)
frames = range(start_frame, len(cam_rots)+start_frame)
pylab.title(f'Camera rotation vs frame {video_quality}')
pylab.plot(frames, cam_rots*_RADS_TO_DEGS, '-ro', label='x')
pylab.xlabel('frame #')
pylab.ylabel('camera rotation (degrees)')
matplotlib.pyplot.savefig(f'{plot_name_stem}_cam_rots.png')
pylab.close(video_quality)
def plot_gyro_events(gyro_events, plot_name, log_path):
"""Plot x, y, and z on the gyro events.
Samples are grouped into NUM_GYRO_PTS_TO_AVG groups and averaged to minimize
random spikes in data.
Args:
gyro_events: List of gyroscope events.
plot_name: name of plot(s).
log_path: location to save data.
"""
nevents = (len(gyro_events) // _NUM_GYRO_PTS_TO_AVG) * _NUM_GYRO_PTS_TO_AVG
gyro_events = gyro_events[:nevents]
times = np.array([(e['time'] - gyro_events[0]['time']) * _NSEC_TO_SEC
for e in gyro_events])
x = np.array([e['x'] for e in gyro_events])
y = np.array([e['y'] for e in gyro_events])
z = np.array([e['z'] for e in gyro_events])
# Group samples into size-N groups & average each together to minimize random
# spikes in data.
times = times[_NUM_GYRO_PTS_TO_AVG//2::_NUM_GYRO_PTS_TO_AVG]
x = x.reshape(nevents//_NUM_GYRO_PTS_TO_AVG, _NUM_GYRO_PTS_TO_AVG).mean(1)
y = y.reshape(nevents//_NUM_GYRO_PTS_TO_AVG, _NUM_GYRO_PTS_TO_AVG).mean(1)
z = z.reshape(nevents//_NUM_GYRO_PTS_TO_AVG, _NUM_GYRO_PTS_TO_AVG).mean(1)
pylab.figure(plot_name)
# x & y on same axes
pylab.subplot(2, 1, 1)
pylab.title(f'{plot_name}(mean of {_NUM_GYRO_PTS_TO_AVG} pts)')
pylab.plot(times, x, 'r', label='x')
pylab.plot(times, y, 'g', label='y')
pylab.ylim([np.amin(z)/4, np.amax(z)/4]) # zoom in 4x from z axis
pylab.ylabel('gyro x,y movement (rads/s)')
pylab.legend()
# z on separate axes
pylab.subplot(2, 1, 2)
pylab.plot(times, z, 'b', label='z')
pylab.ylim([np.amin(z), np.amax(z)])
pylab.xlabel('time (seconds)')
pylab.ylabel('gyro z movement (rads/s)')
pylab.legend()
file_name = os.path.join(log_path, plot_name)
matplotlib.pyplot.savefig(f'{file_name}_gyro_events.png')
pylab.close(plot_name)
z_max = max(abs(z))
logging.debug('z_max: %.3f', z_max)
if z_max > _GYRO_ROTATION_PER_SEC_MAX:
raise AssertionError(
f'Phone moved too rapidly! Please confirm controller firmware. '
f'Max: {z_max:.3f}, TOL: {_GYRO_ROTATION_PER_SEC_MAX} rads/s')
def conv_acceleration_to_movement(gyro_events, video_delay_time):
"""Convert gyro_events time and speed to movement during video time.
Args:
gyro_events: sorted dict of entries with 'time', 'x', 'y', and 'z'
video_delay_time: time at which video starts (and the video's duration)
Returns:
'z' acceleration converted to movement for times around VIDEO playing.
"""
gyro_times = np.array([e['time'] for e in gyro_events])
gyro_speed = np.array([e['z'] for e in gyro_events])
gyro_time_min = gyro_times[0]
logging.debug('gyro start time: %dns', gyro_time_min)
logging.debug('gyro stop time: %dns', gyro_times[-1])
gyro_rotations = []
video_time_start = gyro_time_min + video_delay_time *_SEC_TO_NSEC
video_time_stop = video_time_start + video_delay_time *_SEC_TO_NSEC
logging.debug('video start time: %dns', video_time_start)
logging.debug('video stop time: %dns', video_time_stop)
for i, t in enumerate(gyro_times):
if video_time_start <= t <= video_time_stop:
gyro_rotations.append((gyro_times[i]-gyro_times[i-1])/_SEC_TO_NSEC *
gyro_speed[i])
return np.array(gyro_rotations)