| # Copyright 2014 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Verify image and inertial sensor events are well synchronized.""" |
| |
| |
| import json |
| import logging |
| import math |
| import multiprocessing |
| import os |
| import sys |
| import time |
| |
| from matplotlib import pylab |
| import matplotlib.pyplot |
| from mobly import test_runner |
| import numpy as np |
| import scipy.spatial |
| |
| import cv2 |
| import its_base_test |
| import camera_properties_utils |
| import capture_request_utils |
| import image_processing_utils |
| import its_session_utils |
| import sensor_fusion_utils |
| |
| _CAM_FRAME_RANGE_MAX = 9.0 # Seconds: max allowed camera frame range. |
| _FEATURE_MARGIN = 0.20 # Only take feature points from center 20% so that |
| # rotation measured has less rolling shutter effect. |
| _FEATURE_PTS_MIN = 30 # Min number of feature pts to perform rotation analysis. |
| # cv2.goodFeatures to track. |
| # 'POSTMASK' is the measurement method in all previous versions of Android. |
| # 'POSTMASK' finds best features on entire frame and then masks the features |
| # to the vertical center FEATURE_MARGIN for the measurement. |
| # 'PREMASK' is a new measurement that is used when FEATURE_PTS_MIN is not |
| # found in frame. This finds the best 2*FEATURE_PTS_MIN in the FEATURE_MARGIN |
| # part of the frame. |
| _CV2_FEATURE_PARAMS_POSTMASK = dict(maxCorners=240, |
| qualityLevel=0.3, |
| minDistance=7, |
| blockSize=7) |
| _CV2_FEATURE_PARAMS_PREMASK = dict(maxCorners=2*_FEATURE_PTS_MIN, |
| qualityLevel=0.3, |
| minDistance=7, |
| blockSize=7) |
| _GYRO_SAMP_RATE_MIN = 100.0 # Samples/second: min gyro sample rate. |
| _CV2_LK_PARAMS = dict(winSize=(15, 15), |
| maxLevel=2, |
| criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, |
| 10, 0.03)) # cv2.calcOpticalFlowPyrLK params. |
| |
| _NAME = os.path.splitext(os.path.basename(__file__))[0] |
| _ARDUINO_ANGLES = (0, 90) |
| _ARDUINO_MOVE_TIME = 2 |
| _ARDUINO_SERVO_SPEED = 20 |
| _NUM_ROTATIONS = 10 |
| _START_FRAME = 1 |
| _FRAME_DELTA_TOL = 1.5 # 50% margin over nominal FPS of captures |
| |
| # Constants to convert between different units (for clarity). |
| _SEC_TO_MSEC = 1000 |
| _MSEC_TO_NSEC = 1000*1000 |
| _NSEC_TO_SEC = 1.0E-9 |
| _CM_TO_M = 1E-2 |
| _RADS_TO_DEGS = 180/math.pi |
| |
| # PASS/FAIL thresholds. |
| _CORR_DIST_THRESH_MAX = 0.005 |
| _OFFSET_MS_THRESH_MAX = 1 # mseconds |
| _ROTATION_PER_FRAME_MIN = 0.001 # rads/s |
| |
| # PARAMs from S refactor. |
| |
| # Set maximum exposure time to reduce motion blur |
| # blur = velocity * exp_time * N_pixels / FOV |
| # blur: 3 pixels, v: chart R (17.7cm) / rot_time (1.5s), N: 640, FOV: 30cm |
| _EXP_MAX = 4*_MSEC_TO_NSEC # 4 ms |
| _GYRO_INIT_WAIT_TIME = 0.5 # Seconds to wait for gyro to stabilize. |
| _GYRO_POST_WAIT_TIME = 0.2 # Seconds to wait to capture some extra gyro data. |
| _IMG_SIZE_MAX = 640 * 480 # Maximum image size. |
| _NUM_FRAMES_MAX = 300 # fps*test_length should be < this for smooth captures. |
| |
| |
| def _collect_data(cam, fps, w, h, test_length, rot_rig, chart_dist, log_path): |
| """Capture a new set of data from the device. |
| |
| Captures camera frames while the user is moving the device in the proscribed |
| manner. Note since the capture request is for a manual request, optical |
| image stabilization (OIS) is disabled. |
| |
| Args: |
| cam: camera object. |
| fps: frames per second capture rate. |
| w: pixel width of frames. |
| h: pixel height of frames. |
| test_length: length of time for test in seconds. |
| rot_rig: dict with 'cntl' and 'ch' defined. |
| chart_dist: float value of distance to chart in meters. |
| log_path: location to save data. |
| |
| Returns: |
| frames: list of RGB images as numpy arrays. |
| """ |
| logging.debug('Starting sensor event collection') |
| props = cam.get_camera_properties() |
| props = cam.override_with_hidden_physical_camera_props(props) |
| camera_properties_utils.skip_unless( |
| camera_properties_utils.sensor_fusion_capable(props)) |
| |
| # Start camera rotation. |
| p = multiprocessing.Process( |
| target=sensor_fusion_utils.rotation_rig, |
| args=(rot_rig['cntl'], rot_rig['ch'], _NUM_ROTATIONS, |
| _ARDUINO_ANGLES, _ARDUINO_SERVO_SPEED, _ARDUINO_MOVE_TIME,)) |
| p.start() |
| |
| cam.start_sensor_events() |
| |
| # Sleep a while for gyro events to stabilize. |
| time.sleep(_GYRO_INIT_WAIT_TIME) |
| |
| # Capture frames. |
| facing = props['android.lens.facing'] |
| if (facing != camera_properties_utils.LENS_FACING_FRONT and |
| facing != camera_properties_utils.LENS_FACING_BACK): |
| raise AssertionError(f'Unknown lens facing: {facing}.') |
| |
| fmt = {'format': 'yuv', 'width': w, 'height': h} |
| s, e, _, _, _ = cam.do_3a(get_results=True, do_af=False) |
| logging.debug('3A ISO: %d, exp: %.3fms', s, e/_MSEC_TO_NSEC) |
| if e > _EXP_MAX: |
| logging.debug('Re-assigning exposure time') |
| s *= int(round(e / _EXP_MAX)) |
| e = _EXP_MAX |
| s_max = props['android.sensor.info.sensitivityRange'][1] |
| if s > s_max: |
| logging.debug('Calculated ISO too large! ISO: %d, MAX: %d ' |
| 'Setting ISO to max analog gain value.', s, s_max) |
| s = s_max |
| req = capture_request_utils.manual_capture_request(s, e) |
| capture_request_utils.turn_slow_filters_off(props, req) |
| fd_min = props['android.lens.info.minimumFocusDistance'] |
| fd_chart = 1 / chart_dist |
| fd_meas = min(fd_min, fd_chart) |
| logging.debug('fd_min: %.3f, fd_chart: %.3f, fd_meas: %.3f', |
| fd_min, fd_chart, fd_meas) |
| req['android.lens.focusDistance'] = fd_meas |
| req['android.control.aeTargetFpsRange'] = [fps, fps] |
| req['android.sensor.frameDuration'] = int(1 / _NSEC_TO_SEC / fps) |
| logging.debug('Capturing %dx%d with ISO %d, exp %.3fms at %dfps', |
| w, h, s, e/_MSEC_TO_NSEC, fps) |
| caps = cam.do_capture([req] * int(fps * test_length), fmt) |
| |
| # Capture a bit more gyro samples for use in get_best_alignment_offset |
| time.sleep(_GYRO_POST_WAIT_TIME) |
| |
| # Get the gyro events. |
| logging.debug('Reading out inertial sensor events') |
| gyro = cam.get_sensor_events()['gyro'] |
| logging.debug('Number of gyro samples %d', len(gyro)) |
| |
| # Combine the gyro and camera events into a single structure. |
| logging.debug('Dumping event data') |
| starts = [cap['metadata']['android.sensor.timestamp'] for cap in caps] |
| exptimes = [cap['metadata']['android.sensor.exposureTime'] for cap in caps] |
| readouts = [cap['metadata']['android.sensor.rollingShutterSkew'] |
| for cap in caps] |
| events = {'gyro': gyro, 'cam': list(zip(starts, exptimes, readouts)), |
| 'facing': facing} |
| with open('%s_events.txt' % os.path.join(log_path, _NAME), 'w') as f: |
| f.write(json.dumps(events)) |
| |
| # Convert frames to RGB. |
| logging.debug('Dumping frames') |
| frames = [] |
| for i, cap in enumerate(caps): |
| img = image_processing_utils.convert_capture_to_rgb_image(cap) |
| frames.append(img) |
| image_processing_utils.write_image(img, '%s_frame%03d.png' % ( |
| os.path.join(log_path, _NAME), i)) |
| return events, frames |
| |
| |
| def _get_cam_times(cam_events, fps): |
| """Get the camera frame times. |
| |
| Assign a time to each frame. Assumes the image is instantly captured in the |
| middle of exposure. |
| |
| Args: |
| cam_events: List of (start_exposure, exposure_time, readout_duration) |
| tuples, one per captured frame, with times in nanoseconds. |
| fps: float of frames per second value |
| |
| Returns: |
| frame_times: Array of N times, one corresponding to the 'middle' of the |
| exposure of each frame. |
| """ |
| starts = np.array([start for start, exptime, readout in cam_events]) |
| max_frame_delta_ms = (np.amax(np.subtract(starts[1:], starts[0:-1])) / |
| _MSEC_TO_NSEC) |
| logging.debug('Maximum frame delta: %.3f ms', max_frame_delta_ms) |
| frame_delta_tol_ms = _FRAME_DELTA_TOL * (1 / fps) * _SEC_TO_MSEC |
| if max_frame_delta_ms > frame_delta_tol_ms: |
| raise AssertionError(f'Frame drop! Max delta: {max_frame_delta_ms:.3f}ms, ' |
| f'ATOL: {frame_delta_tol_ms}ms') |
| exptimes = np.array([exptime for start, exptime, readout in cam_events]) |
| if not np.all(exptimes == exptimes[0]): |
| raise AssertionError(f'Exposure times vary in frames! {exptimes}') |
| readouts = np.array([readout for start, exptime, readout in cam_events]) |
| if not np.all(readouts == readouts[0]): |
| raise AssertionError(f'Rolling shutter not always equal! {readouts}') |
| frame_times = starts + (exptimes + readouts) / 2.0 |
| return frame_times |
| |
| |
| def _procrustes_rotation(x, y): |
| """Performs a Procrustes analysis to conform points in x to y. |
| |
| Procrustes analysis determines a linear transformation (translation, |
| reflection, orthogonal rotation and scaling) of the points in y to best |
| conform them to the points in matrix x, using the sum of squared errors |
| as the metric for fit criterion. |
| |
| Args: |
| x: Target coordinate matrix |
| y: Input coordinate matrix |
| |
| Returns: |
| The rotation component of the transformation that maps x to y. |
| """ |
| x0 = (x-x.mean(0)) / np.sqrt(((x-x.mean(0))**2.0).sum()) |
| y0 = (y-y.mean(0)) / np.sqrt(((y-y.mean(0))**2.0).sum()) |
| u, _, vt = np.linalg.svd(np.dot(x0.T, y0), full_matrices=False) |
| return np.dot(vt.T, u.T) |
| |
| |
| def _get_cam_rotations(frames, facing, h, log_path): |
| """Get the rotations of the camera between each pair of frames. |
| |
| Takes N frames and returns N-1 angular displacements corresponding to the |
| rotations between adjacent pairs of frames, in radians. |
| Only takes feature points from center so that rotation measured has less |
| rolling shutter effect. |
| Requires FEATURE_PTS_MIN to have enough data points for accurate measurements. |
| Uses FEATURE_PARAMS for cv2 to identify features in checkerboard images. |
| Ensures camera rotates enough. |
| |
| Args: |
| frames: List of N images (as RGB numpy arrays). |
| facing: Direction camera is facing. |
| h: Pixel height of each frame. |
| log_path: Location for data. |
| |
| Returns: |
| numpy array of N-1 camera rotation measurements (rad). |
| """ |
| gframes = [] |
| file_name_stem = os.path.join(log_path, _NAME) |
| for frame in frames: |
| frame = (frame * 255.0).astype(np.uint8) # cv2 uses [0, 255] |
| gframes.append(cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)) |
| num_frames = len(gframes) |
| logging.debug('num_frames: %d', num_frames) |
| |
| # create mask |
| ymin = int(h * (1 - _FEATURE_MARGIN) / 2) |
| ymax = int(h * (1 + _FEATURE_MARGIN) / 2) |
| pre_mask = np.zeros_like(gframes[0]) |
| pre_mask[ymin:ymax, :] = 255 |
| |
| for masking in ['post', 'pre']: # Do post-masking (original) method 1st |
| logging.debug('Using %s masking method', masking) |
| rots = [] |
| for i in range(1, num_frames): |
| j = i - 1 |
| gframe0 = gframes[j] |
| gframe1 = gframes[i] |
| if masking == 'post': |
| p0 = cv2.goodFeaturesToTrack( |
| gframe0, mask=None, **_CV2_FEATURE_PARAMS_POSTMASK) |
| post_mask = (p0[:, 0, 1] >= ymin) & (p0[:, 0, 1] <= ymax) |
| p0_filtered = p0[post_mask] |
| else: |
| p0_filtered = cv2.goodFeaturesToTrack( |
| gframe0, mask=pre_mask, **_CV2_FEATURE_PARAMS_PREMASK) |
| num_features = len(p0_filtered) |
| if num_features < _FEATURE_PTS_MIN: |
| for pt in np.rint(p0_filtered).astype(int): |
| x, y = pt[0][0], pt[0][1] |
| cv2.circle(frames[j], (x, y), 3, (100, 255, 255), -1) |
| image_processing_utils.write_image( |
| frames[j], f'{file_name_stem}_features{j+_START_FRAME:03d}.png') |
| msg = (f'Not enough features in frame {j+_START_FRAME}. Need at least ' |
| f'{_FEATURE_PTS_MIN} features, got {num_features}.') |
| if masking == 'pre': |
| raise AssertionError(msg) |
| else: |
| logging.debug(msg) |
| break |
| else: |
| logging.debug('Number of features in frame %s is %d', |
| str(j+_START_FRAME).zfill(3), num_features) |
| p1, st, _ = cv2.calcOpticalFlowPyrLK(gframe0, gframe1, p0_filtered, None, |
| **_CV2_LK_PARAMS) |
| tform = _procrustes_rotation(p0_filtered[st == 1], p1[st == 1]) |
| if facing == camera_properties_utils.LENS_FACING_BACK: |
| rot = -math.atan2(tform[0, 1], tform[0, 0]) |
| elif facing == camera_properties_utils.LENS_FACING_FRONT: |
| rot = math.atan2(tform[0, 1], tform[0, 0]) |
| else: |
| raise AssertionError(f'Unknown lens facing: {facing}.') |
| rots.append(rot) |
| if i == 1: |
| # Save debug visualization of features that are being |
| # tracked in the first frame. |
| frame = frames[j] |
| for x, y in np.rint(p0_filtered[st == 1]).astype(int): |
| cv2.circle(frame, (x, y), 3, (100, 255, 255), -1) |
| image_processing_utils.write_image( |
| frame, f'{file_name_stem}_features{j+_START_FRAME:03d}.png') |
| if i == num_frames-1: |
| logging.debug('Correct num of frames found: %d', i) |
| break # exit if enough features in all frames |
| if i != num_frames-1: |
| raise AssertionError('Neither method found enough features in all frames') |
| |
| rots = np.array(rots) |
| rot_per_frame_max = max(abs(rots)) |
| logging.debug('Max rotation: %.4f radians', rot_per_frame_max) |
| if rot_per_frame_max < _ROTATION_PER_FRAME_MIN: |
| raise AssertionError(f'Device not moved enough: {rot_per_frame_max:.3f} ' |
| f'movement. THRESH: {_ROTATION_PER_FRAME_MIN}.') |
| |
| return rots |
| |
| |
| def _plot_best_shift(best, coeff, x, y, log_path): |
| """Saves a plot the best offset, fit data and x,y data. |
| |
| Args: |
| best: x value of best fit data. |
| coeff: 3 element np array. Return of np.polyfit(x,y) for 2nd order fit. |
| x: np array of x data that was fit. |
| y: np array of y data that was fit. |
| log_path: where to store data. |
| """ |
| xfit = np.arange(x[0], x[-1], 0.05).tolist() |
| yfit = [coeff[0]*x*x + coeff[1]*x + coeff[2] for x in xfit] |
| pylab.figure() |
| pylab.title(f'{_NAME} Gyro/Camera Time Correlation') |
| pylab.plot(x, y, 'ro', label='data', alpha=0.7) |
| pylab.plot(xfit, yfit, 'b', label='fit', alpha=0.7) |
| pylab.plot(best, min(yfit), 'g*', label='best', markersize=10) |
| pylab.ticklabel_format(axis='y', style='sci', scilimits=(-3, -3)) |
| pylab.xlabel('Relative horizontal shift between curves (ms)') |
| pylab.ylabel('Correlation distance') |
| pylab.legend() |
| matplotlib.pyplot.savefig( |
| '%s_plot_shifts.png' % os.path.join(log_path, _NAME)) |
| |
| |
| def _plot_rotations(cam_rots, gyro_rots, log_path): |
| """Saves a plot of the camera vs. gyro rotational measurements. |
| |
| Args: |
| cam_rots: Array of camera rotation measurements (rads). |
| gyro_rots: Array of gyro rotation measurements (rads). |
| log_path: Location to store data. |
| """ |
| # For plot, scale rotations to degrees. |
| pylab.figure() |
| pylab.title(f'{_NAME} Gyro/Camera Rotations') |
| pylab.plot(range(len(cam_rots)), cam_rots*_RADS_TO_DEGS, '-r.', |
| label='camera', alpha=0.7) |
| pylab.plot(range(len(gyro_rots)), gyro_rots*_RADS_TO_DEGS, '-b.', |
| label='gyro', alpha=0.7) |
| pylab.xlabel('Camera frame number') |
| pylab.ylabel('Angular displacement between adjacent camera frames (degrees)') |
| pylab.xlim([0, len(cam_rots)]) |
| pylab.legend() |
| matplotlib.pyplot.savefig( |
| '%s_plot_rotations.png' % os.path.join(log_path, _NAME)) |
| |
| |
| def load_data(): |
| """Load a set of previously captured data. |
| |
| Returns: |
| events: Dictionary containing all gyro events and cam timestamps. |
| frames: List of RGB images as numpy arrays. |
| w: Pixel width of frames |
| h: Pixel height of frames |
| """ |
| with open(f'{_NAME}_events.txt', 'r') as f: |
| events = json.loads(f.read()) |
| n = len(events['cam']) |
| frames = [] |
| for i in range(n): |
| img = image_processing_utils.read_image(f'{_NAME}_frame{i:03d}.png') |
| w, h = img.size[0:2] |
| frames.append(np.array(img).reshape((h, w, 3)) / 255) |
| return events, frames, w, h |
| |
| |
| class SensorFusionTest(its_base_test.ItsBaseTest): |
| """Tests if image and motion sensor events are well synchronized. |
| |
| Tests gyro and camera timestamp differences while camera is rotating. |
| Test description is in SensorFusion.pdf file. Test rotates phone in proscribed |
| manner and captures images. |
| |
| Camera rotation is determined from images and from gyroscope events. |
| Timestamp offset between gyro and camera is determined using scipy |
| spacial correlation distance. The min value is determined as the optimum. |
| |
| PASS/FAIL based on the offset and also the correlation distance. |
| """ |
| |
| def _assert_gyro_encompasses_camera(self, cam_times, gyro_times): |
| """Confirms the camera events are bounded by the gyroscope events. |
| |
| Also ensures: |
| 1. Camera frame range is less than MAX_CAMERA_FRAME_RANGE. When camera |
| frame range is significantly larger than spec, the system is usually in a |
| busy/bad state. |
| 2. Gyro samples per second are greater than GYRO_SAMP_RATE_MIN |
| |
| Args: |
| cam_times: numpy array of camera times. |
| gyro_times: List of 'gyro' times. |
| """ |
| min_cam_time = min(cam_times) * _NSEC_TO_SEC |
| max_cam_time = max(cam_times) * _NSEC_TO_SEC |
| min_gyro_time = min(gyro_times) * _NSEC_TO_SEC |
| max_gyro_time = max(gyro_times) * _NSEC_TO_SEC |
| if not (min_cam_time > min_gyro_time and max_cam_time < max_gyro_time): |
| raise AssertionError( |
| f'Camera timestamps [{min_cam_time}, {max_cam_time}] not ' |
| f'enclosed by gyro timestamps [{min_gyro_time}, {max_gyro_time}]') |
| |
| cam_frame_range = max_cam_time - min_cam_time |
| logging.debug('Camera frame range: %f', cam_frame_range) |
| |
| gyro_time_range = max_gyro_time - min_gyro_time |
| gyro_smp_per_sec = len(gyro_times) / gyro_time_range |
| logging.debug('Gyro samples per second: %f', gyro_smp_per_sec) |
| if cam_frame_range > _CAM_FRAME_RANGE_MAX: |
| raise AssertionError(f'Camera frame range, {cam_frame_range}s, too high!') |
| if gyro_smp_per_sec < _GYRO_SAMP_RATE_MIN: |
| raise AssertionError(f'Gyro sample rate, {gyro_smp_per_sec}S/s, low!') |
| |
| def test_sensor_fusion(self): |
| # Determine if '--replay' in cmd line |
| replay = False |
| for s in list(sys.argv[1:]): |
| if '--replay' in s: |
| replay = True |
| logging.info('test_sensor_fusion.py not run. Using existing data set.') |
| |
| rot_rig = {} |
| fps = float(self.fps) |
| img_w, img_h = self.img_w, self.img_h |
| test_length = float(self.test_length) |
| log_path = self.log_path |
| chart_distance = self.chart_distance * _CM_TO_M |
| |
| if img_w * img_h > _IMG_SIZE_MAX or fps * test_length > _NUM_FRAMES_MAX: |
| logging.debug( |
| 'Warning: Your test parameters may require fast write speeds' |
| ' to run smoothly. If you run into problems, consider' |
| " smaller values of 'w', 'h', 'fps', or 'test_length'.") |
| |
| if replay: |
| events, frames, _, h = load_data() |
| else: |
| with its_session_utils.ItsSession( |
| device_id=self.dut.serial, |
| camera_id=self.camera_id, |
| hidden_physical_id=self.hidden_physical_id) as cam: |
| |
| rot_rig['cntl'] = self.rotator_cntl |
| rot_rig['ch'] = self.rotator_ch |
| events, frames = _collect_data(cam, fps, img_w, img_h, test_length, |
| rot_rig, chart_distance, log_path) |
| logging.debug('Start frame: %d', _START_FRAME) |
| |
| sensor_fusion_utils.plot_gyro_events(events['gyro'], _NAME, log_path) |
| |
| # Validity check on gyro/camera timestamps |
| cam_times = _get_cam_times( |
| events['cam'][_START_FRAME:len(events['cam'])], fps) |
| gyro_times = [e['time'] for e in events['gyro']] |
| self._assert_gyro_encompasses_camera(cam_times, gyro_times) |
| |
| # Compute cam rotation displacement(rads) between pairs of adjacent frames. |
| cam_rots = _get_cam_rotations( |
| frames[_START_FRAME:len(frames)], events['facing'], img_h, log_path) |
| logging.debug('cam_rots: %s', str(cam_rots)) |
| gyro_rots = sensor_fusion_utils.get_gyro_rotations( |
| events['gyro'], cam_times) |
| _plot_rotations(cam_rots, gyro_rots, log_path) |
| |
| # Find the best offset. |
| offset_ms, coeffs, candidates, distances = sensor_fusion_utils.get_best_alignment_offset( |
| cam_times, cam_rots, events['gyro']) |
| _plot_best_shift(offset_ms, coeffs, candidates, distances, log_path) |
| |
| # Calculate correlation distance with best offset. |
| corr_dist = scipy.spatial.distance.correlation(cam_rots, gyro_rots) |
| logging.debug('Best correlation of %f at shift of %.3fms', |
| corr_dist, offset_ms) |
| |
| # Assert PASS/FAIL criteria. |
| if corr_dist > _CORR_DIST_THRESH_MAX: |
| raise AssertionError(f'Poor gyro/camera correlation: {corr_dist:.6f}, ' |
| f'TOL: {_CORR_DIST_THRESH_MAX}.') |
| if abs(offset_ms) > _OFFSET_MS_THRESH_MAX: |
| raise AssertionError('Offset too large. Measured (ms): ' |
| f'{offset_ms:.3f}, TOL: {_OFFSET_MS_THRESH_MAX}.') |
| |
| if __name__ == '__main__': |
| test_runner.main() |