| # Copyright 2020 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Utility functions for sensor_fusion hardware rig.""" |
| |
| |
| import bisect |
| import codecs |
| import logging |
| import struct |
| import time |
| import unittest |
| |
| import numpy as np |
| import scipy.spatial |
| import serial |
| from serial.tools import list_ports |
| |
| # Constants for Rotation Rig |
| ARDUINO_ANGLE_MAX = 180.0 # degrees |
| ARDUINO_BAUDRATE = 9600 |
| ARDUINO_CMD_LENGTH = 3 |
| ARDUINO_CMD_TIME = 2.0 * ARDUINO_CMD_LENGTH / ARDUINO_BAUDRATE # round trip |
| ARDUINO_PID = 0x0043 |
| ARDUINO_SERVO_SPEED_MAX = 255 |
| ARDUINO_SERVO_SPEED_MIN = 1 |
| ARDUINO_SPEED_START_BYTE = 253 |
| ARDUINO_START_BYTE = 255 |
| ARDUINO_START_NUM_TRYS = 3 |
| ARDUINO_TEST_CMD = (b'\x01', b'\x02', b'\x03') |
| ARDUINO_VALID_CH = ('1', '2', '3', '4', '5', '6') |
| ARDUINO_VIDS = (0x2341, 0x2a03) |
| |
| CANAKIT_BAUDRATE = 115200 |
| CANAKIT_CMD_TIME = 0.05 # seconds (found experimentally) |
| CANAKIT_DATA_DELIMITER = '\r\n' |
| CANAKIT_PID = 0xfc73 |
| CANAKIT_SEND_TIMEOUT = 0.02 # seconds |
| CANAKIT_SET_CMD = 'REL' |
| CANAKIT_SLEEP_TIME = 2 # seconds (for full 90 degree rotation) |
| CANAKIT_VALID_CMD = ('ON', 'OFF') |
| CANAKIT_VALID_CH = ('1', '2', '3', '4') |
| CANAKIT_VID = 0x04d8 |
| |
| HS755HB_ANGLE_MAX = 202.0 # throw for rotation motor in degrees |
| |
| _COARSE_FIT_RANGE = 20 # Range area around coarse fit to do optimization. |
| _CORR_TIME_OFFSET_MAX = 50 # ms max shift to try and match camera/gyro times. |
| _CORR_TIME_OFFSET_STEP = 0.5 # ms step for shifts. |
| |
| # Unit translators |
| _MSEC_TO_NSEC = 1000000 |
| _NSEC_TO_SEC = 1E-9 |
| _SEC_TO_NSEC = int(1/_NSEC_TO_SEC) |
| |
| |
| def serial_port_def(name): |
| """Determine the serial port and open. |
| |
| Args: |
| name: string of device to locate (ie. 'Arduino', 'Canakit' or 'Default') |
| Returns: |
| serial port object |
| """ |
| serial_port = None |
| devices = list_ports.comports() |
| for device in devices: |
| if not (device.vid and device.pid): # Not all comm ports have vid and pid |
| continue |
| if name.lower() == 'arduino': |
| if (device.vid in ARDUINO_VIDS and device.pid == ARDUINO_PID): |
| logging.debug('Arduino: %s', str(device)) |
| serial_port = device.device |
| return serial.Serial(serial_port, ARDUINO_BAUDRATE, timeout=1) |
| |
| elif name.lower() in ('canakit', 'default'): |
| if (device.vid == CANAKIT_VID and device.pid == CANAKIT_PID): |
| logging.debug('Canakit: %s', str(device)) |
| serial_port = device.device |
| return serial.Serial(serial_port, CANAKIT_BAUDRATE, |
| timeout=CANAKIT_SEND_TIMEOUT, |
| parity=serial.PARITY_EVEN, |
| stopbits=serial.STOPBITS_ONE, |
| bytesize=serial.EIGHTBITS) |
| raise ValueError(f'{name} device not connected.') |
| |
| |
| def canakit_cmd_send(canakit_serial_port, cmd_str): |
| """Wrapper for sending serial command to Canakit. |
| |
| Args: |
| canakit_serial_port: port to write for canakit |
| cmd_str: str; value to send to device. |
| """ |
| try: |
| logging.debug('writing port...') |
| canakit_serial_port.write(CANAKIT_DATA_DELIMITER.encode()) |
| time.sleep(CANAKIT_CMD_TIME) # This is critical for relay. |
| canakit_serial_port.write(cmd_str.encode()) |
| |
| except IOError: |
| raise IOError(f'Port {CANAKIT_VID}:{CANAKIT_PID} is not open!') |
| |
| |
| def canakit_set_relay_channel_state(canakit_port, ch, state): |
| """Set Canakit relay channel and state. |
| |
| Waits CANAKIT_SLEEP_TIME for rotation to occur. |
| |
| Args: |
| canakit_port: serial port object for the Canakit port. |
| ch: string for channel number of relay to set. '1', '2', '3', or '4' |
| state: string of either 'ON' or 'OFF' |
| """ |
| logging.debug('Setting relay state %s', state) |
| if ch in CANAKIT_VALID_CH and state in CANAKIT_VALID_CMD: |
| canakit_cmd_send(canakit_port, CANAKIT_SET_CMD + ch + '.' + state + '\r\n') |
| time.sleep(CANAKIT_SLEEP_TIME) |
| else: |
| logging.debug('Invalid ch (%s) or state (%s), no command sent.', ch, state) |
| |
| |
| def arduino_read_cmd(port): |
| """Read back Arduino command from serial port.""" |
| cmd = [] |
| for _ in range(ARDUINO_CMD_LENGTH): |
| cmd.append(port.read()) |
| return cmd |
| |
| |
| def arduino_send_cmd(port, cmd): |
| """Send command to serial port.""" |
| for i in range(ARDUINO_CMD_LENGTH): |
| port.write(cmd[i]) |
| |
| |
| def arduino_loopback_cmd(port, cmd): |
| """Send command to serial port.""" |
| arduino_send_cmd(port, cmd) |
| time.sleep(ARDUINO_CMD_TIME) |
| return arduino_read_cmd(port) |
| |
| |
| def establish_serial_comm(port): |
| """Establish connection with serial port.""" |
| logging.debug('Establishing communication with %s', port.name) |
| trys = 1 |
| hex_test = convert_to_hex(ARDUINO_TEST_CMD) |
| logging.debug(' test tx: %s %s %s', hex_test[0], hex_test[1], hex_test[2]) |
| while trys <= ARDUINO_START_NUM_TRYS: |
| cmd_read = arduino_loopback_cmd(port, ARDUINO_TEST_CMD) |
| hex_read = convert_to_hex(cmd_read) |
| logging.debug(' test rx: %s %s %s', hex_read[0], hex_read[1], hex_read[2]) |
| if cmd_read != list(ARDUINO_TEST_CMD): |
| trys += 1 |
| else: |
| logging.debug(' Arduino comm established after %d try(s)', trys) |
| break |
| |
| |
| def convert_to_hex(cmd): |
| return [('%0.2x' % int(codecs.encode(x, 'hex_codec'), 16) if x else '--') |
| for x in cmd] |
| |
| |
| def arduino_rotate_servo_to_angle(ch, angle, serial_port, move_time): |
| """Rotate servo to the specified angle. |
| |
| Args: |
| ch: str; servo to rotate in ARDUINO_VALID_CH |
| angle: int; servo angle to move to |
| serial_port: object; serial port |
| move_time: int; time in seconds |
| """ |
| if angle < 0 or angle > ARDUINO_ANGLE_MAX: |
| logging.debug('Angle must be between 0 and %d.', ARDUINO_ANGLE_MAX) |
| angle = 0 |
| if angle > ARDUINO_ANGLE_MAX: |
| angle = ARDUINO_ANGLE_MAX |
| |
| cmd = [struct.pack('B', i) for i in [ARDUINO_START_BYTE, int(ch), angle]] |
| arduino_send_cmd(serial_port, cmd) |
| time.sleep(move_time) |
| |
| |
| def arduino_rotate_servo(ch, angles, servo_speed, move_time, serial_port): |
| """Rotate servo through 'angles'. |
| |
| Args: |
| ch: str; servo to rotate |
| angles: list of ints; servo angles to move to |
| servo_speed: int; move speed between [1, 255] |
| move_time: int; time required to allow for arduino movement |
| serial_port: object; serial port |
| """ |
| |
| # set servo speed |
| logging.debug('Servo speed: %d', servo_speed) |
| set_servo_speed(ch, servo_speed, serial_port, delay=0) |
| |
| for angle in angles: |
| angle_norm = int(round(angle*ARDUINO_ANGLE_MAX/HS755HB_ANGLE_MAX, 0)) |
| arduino_rotate_servo_to_angle(ch, angle_norm, serial_port, move_time) |
| |
| |
| def rotation_rig(rotate_cntl, rotate_ch, num_rotations, angles, servo_speed, |
| move_time): |
| """Rotate the phone n times using rotate_cntl and rotate_ch defined. |
| |
| rotate_ch is hard wired and must be determined from physical setup. |
| |
| First initialize the port and send a test string defined by ARDUINO_TEST_CMD |
| to establish communications. Then rotate servo motor to origin position. |
| |
| Args: |
| rotate_cntl: str to identify as 'arduino' or 'canakit' controller. |
| rotate_ch: str to identify rotation channel number. |
| num_rotations: int number of rotations. |
| angles: list of ints; servo angle to move to. |
| servo_speed: int number of move speed between [1, 255]. |
| move_time: int time required to allow for arduino movement. |
| """ |
| |
| logging.debug('Controller: %s, ch: %s', rotate_cntl, rotate_ch) |
| if rotate_cntl.lower() == 'arduino': |
| # identify port |
| arduino_serial_port = serial_port_def('Arduino') |
| |
| # send test cmd to Arduino until cmd returns properly |
| establish_serial_comm(arduino_serial_port) |
| |
| # initialize servo at origin |
| logging.debug('Moving servo to origin') |
| arduino_rotate_servo_to_angle(rotate_ch, 0, arduino_serial_port, 1) |
| |
| elif rotate_cntl.lower() == 'canakit': |
| canakit_serial_port = serial_port_def('Canakit') |
| |
| else: |
| logging.info('No rotation rig defined. Manual test: rotate phone by hand.') |
| |
| # rotate phone |
| logging.debug('Rotating phone %dx', num_rotations) |
| for _ in range(num_rotations): |
| if rotate_cntl == 'arduino': |
| arduino_rotate_servo(rotate_ch, angles, servo_speed, move_time, |
| arduino_serial_port) |
| elif rotate_cntl == 'canakit': |
| canakit_set_relay_channel_state(canakit_serial_port, rotate_ch, 'ON') |
| canakit_set_relay_channel_state(canakit_serial_port, rotate_ch, 'OFF') |
| logging.debug('Finished rotations') |
| if rotate_cntl == 'arduino': |
| logging.debug('Moving servo to origin') |
| arduino_rotate_servo_to_angle(rotate_ch, 0, arduino_serial_port, 1) |
| |
| |
| def set_servo_speed(ch, servo_speed, serial_port, delay=0): |
| """Set servo to specified speed. |
| |
| Args: |
| ch: str; servo to turn on in ARDUINO_VALID_CH |
| servo_speed: int; value of speed between 1 and 255 |
| serial_port: object; serial port |
| delay: int; time in seconds |
| """ |
| if servo_speed < ARDUINO_SERVO_SPEED_MIN: |
| logging.debug('Servo speed must be >= %d.', ARDUINO_SERVO_SPEED_MIN) |
| servo_speed = ARDUINO_SERVO_SPEED_MIN |
| elif servo_speed > ARDUINO_SERVO_SPEED_MAX: |
| logging.debug('Servo speed must be <= %d.', ARDUINO_SERVO_SPEED_MAX) |
| servo_speed = ARDUINO_SERVO_SPEED_MAX |
| |
| cmd = [struct.pack('B', i) for i in [ARDUINO_SPEED_START_BYTE, |
| int(ch), servo_speed]] |
| arduino_send_cmd(serial_port, cmd) |
| time.sleep(delay) |
| |
| |
| def get_gyro_rotations(gyro_events, cam_times): |
| """Get the rotation values of the gyro. |
| |
| Integrates the gyro data between each camera frame to compute an angular |
| displacement. |
| |
| Args: |
| gyro_events: List of gyro event objects. |
| cam_times: Array of N camera times, one for each frame. |
| |
| Returns: |
| Array of N-1 gyro rotation measurements (rads/s). |
| """ |
| gyro_times = np.array([e['time'] for e in gyro_events]) |
| all_gyro_rots = np.array([e['z'] for e in gyro_events]) |
| gyro_rots = [] |
| if gyro_times[0] > cam_times[0] or gyro_times[-1] < cam_times[-1]: |
| raise AssertionError('Gyro times do not bound camera times! ' |
| f'gyro: {gyro_times[0]:.0f} -> {gyro_times[-1]:.0f} ' |
| f'cam: {cam_times[0]} -> {cam_times[-1]} (ns).') |
| # Integrate the gyro data between each pair of camera frame times. |
| for i_cam in range(len(cam_times)-1): |
| # Get the window of gyro samples within the current pair of frames. |
| # Note: bisect always picks first gyro index after camera time. |
| t_cam0 = cam_times[i_cam] |
| t_cam1 = cam_times[i_cam+1] |
| i_gyro_window0 = bisect.bisect(gyro_times, t_cam0) |
| i_gyro_window1 = bisect.bisect(gyro_times, t_cam1) |
| gyro_sum = 0 |
| |
| # Integrate samples within the window. |
| for i_gyro in range(i_gyro_window0, i_gyro_window1): |
| gyro_val = all_gyro_rots[i_gyro+1] |
| t_gyro0 = gyro_times[i_gyro] |
| t_gyro1 = gyro_times[i_gyro+1] |
| t_gyro_delta = (t_gyro1 - t_gyro0) * _NSEC_TO_SEC |
| gyro_sum += gyro_val * t_gyro_delta |
| |
| # Handle the fractional intervals at the sides of the window. |
| for side, i_gyro in enumerate([i_gyro_window0-1, i_gyro_window1]): |
| gyro_val = all_gyro_rots[i_gyro+1] |
| t_gyro0 = gyro_times[i_gyro] |
| t_gyro1 = gyro_times[i_gyro+1] |
| t_gyro_delta = (t_gyro1 - t_gyro0) * _NSEC_TO_SEC |
| if side == 0: |
| f = (t_cam0 - t_gyro0) / (t_gyro1 - t_gyro0) |
| frac_correction = gyro_val * t_gyro_delta * (1.0 - f) |
| gyro_sum += frac_correction |
| else: |
| f = (t_cam1 - t_gyro0) / (t_gyro1 - t_gyro0) |
| frac_correction = gyro_val * t_gyro_delta * f |
| gyro_sum += frac_correction |
| |
| gyro_rots.append(gyro_sum) |
| gyro_rots = np.array(gyro_rots) |
| return gyro_rots |
| |
| |
| def get_best_alignment_offset(cam_times, cam_rots, gyro_events): |
| """Find the best offset to align the camera and gyro motion traces. |
| |
| This function integrates the shifted gyro data between camera samples |
| for a range of candidate shift values, and returns the shift that |
| result in the best correlation. |
| |
| Uses a correlation distance metric between the curves, where a smaller |
| value means that the curves are better-correlated. |
| |
| Fits a curve to the correlation distance data to measure the minima more |
| accurately, by looking at the correlation distances within a range of |
| +/- 10ms from the measured best score; note that this will use fewer |
| than the full +/- 10 range for the curve fit if the measured score |
| (which is used as the center of the fit) is within 10ms of the edge of |
| the +/- 50ms candidate range. |
| |
| Args: |
| cam_times: Array of N camera times, one for each frame. |
| cam_rots: Array of N-1 camera rotation displacements (rad). |
| gyro_events: List of gyro event objects. |
| |
| Returns: |
| Best alignment offset(ms), fit coefficients, candidates, and distances. |
| """ |
| # Measure the correlation distance over defined shift |
| shift_candidates = np.arange(-_CORR_TIME_OFFSET_MAX, |
| _CORR_TIME_OFFSET_MAX+_CORR_TIME_OFFSET_STEP, |
| _CORR_TIME_OFFSET_STEP).tolist() |
| spatial_distances = [] |
| for shift in shift_candidates: |
| shifted_cam_times = cam_times + shift*_MSEC_TO_NSEC |
| gyro_rots = get_gyro_rotations(gyro_events, shifted_cam_times) |
| spatial_distance = scipy.spatial.distance.correlation(cam_rots, gyro_rots) |
| logging.debug('shift %.1fms spatial distance: %.5f', shift, |
| spatial_distance) |
| spatial_distances.append(spatial_distance) |
| |
| best_corr_dist = min(spatial_distances) |
| coarse_best_shift = shift_candidates[spatial_distances.index(best_corr_dist)] |
| logging.debug('Best shift without fitting is %.4f ms', coarse_best_shift) |
| |
| # Fit a 2nd order polynomial around coarse_best_shift to extract best fit |
| i = spatial_distances.index(best_corr_dist) |
| i_poly_fit_min = i - _COARSE_FIT_RANGE |
| i_poly_fit_max = i + _COARSE_FIT_RANGE + 1 |
| shift_candidates = shift_candidates[i_poly_fit_min:i_poly_fit_max] |
| spatial_distances = spatial_distances[i_poly_fit_min:i_poly_fit_max] |
| fit_coeffs = np.polyfit(shift_candidates, spatial_distances, 2) # ax^2+bx+c |
| exact_best_shift = -fit_coeffs[1]/(2*fit_coeffs[0]) |
| if abs(coarse_best_shift - exact_best_shift) > 2.0: |
| raise AssertionError( |
| f'Test failed. Bad fit to time-shift curve. Coarse best shift: ' |
| f'{coarse_best_shift}, Exact best shift: {exact_best_shift}.') |
| if fit_coeffs[0] <= 0 or fit_coeffs[2] <= 0: |
| raise AssertionError( |
| f'Coefficients are < 0: a: {fit_coeffs[0]}, c: {fit_coeffs[2]}.') |
| |
| return exact_best_shift, fit_coeffs, shift_candidates, spatial_distances |
| |
| |
| class SensorFusionUtilsTests(unittest.TestCase): |
| """Run a suite of unit tests on this module.""" |
| |
| _CAM_FRAME_TIME = 30 * _MSEC_TO_NSEC # Similar to 30FPS |
| _CAM_ROT_AMPLITUDE = 0.04 # Empirical number for rotation per frame (rads/s). |
| |
| def _generate_pwl_waveform(self, pts, step, amplitude): |
| """Helper function to generate piece wise linear waveform.""" |
| pwl_waveform = [] |
| for t in range(pts[0], pts[1], step): |
| pwl_waveform.append(0) |
| for t in range(pts[1], pts[2], step): |
| pwl_waveform.append((t-pts[1])/(pts[2]-pts[1])*amplitude) |
| for t in range(pts[2], pts[3], step): |
| pwl_waveform.append(amplitude) |
| for t in range(pts[3], pts[4], step): |
| pwl_waveform.append((pts[4]-t)/(pts[4]-pts[3])*amplitude) |
| for t in range(pts[4], pts[5], step): |
| pwl_waveform.append(0) |
| for t in range(pts[5], pts[6], step): |
| pwl_waveform.append((-1*(t-pts[5])/(pts[6]-pts[5]))*amplitude) |
| for t in range(pts[6], pts[7], step): |
| pwl_waveform.append(-1*amplitude) |
| for t in range(pts[7], pts[8], step): |
| pwl_waveform.append((t-pts[8])/(pts[8]-pts[7])*amplitude) |
| for t in range(pts[8], pts[9], step): |
| pwl_waveform.append(0) |
| return pwl_waveform |
| |
| def _generate_test_waveforms(self, gyro_sampling_rate, t_offset=0): |
| """Define ideal camera/gryo behavior. |
| |
| Args: |
| gyro_sampling_rate: Value in samples/sec. |
| t_offset: Value in ns for gyro/camera timing offset. |
| |
| Returns: |
| cam_times: numpy array of camera times N values long. |
| cam_rots: numpy array of camera rotations N-1 values long. |
| gyro_events: list of dicts of gyro events N*gyro_sampling_rate/30 long. |
| |
| Round trip for motor is ~2 seconds (~60 frames) |
| 1111111111111111 |
| i i |
| i i |
| i i |
| 0000 0000 0000 |
| i i |
| i i |
| i i |
| -1-1-1-1-1-1-1-1 |
| t_0 t_1 t_2 t_3 t_4 t_5 t_6 t_7 t_8 t_9 |
| |
| Note gyro waveform must extend +/- _CORR_TIME_OFFSET_MAX to enable shifting |
| of camera waveform to find best correlation. |
| |
| """ |
| |
| t_ramp = 4 * self._CAM_FRAME_TIME |
| pts = {} |
| pts[0] = 3 * self._CAM_FRAME_TIME |
| pts[1] = pts[0] + 3 * self._CAM_FRAME_TIME |
| pts[2] = pts[1] + t_ramp |
| pts[3] = pts[2] + 32 * self._CAM_FRAME_TIME |
| pts[4] = pts[3] + t_ramp |
| pts[5] = pts[4] + 4 * self._CAM_FRAME_TIME |
| pts[6] = pts[5] + t_ramp |
| pts[7] = pts[6] + 32 * self._CAM_FRAME_TIME |
| pts[8] = pts[7] + t_ramp |
| pts[9] = pts[8] + 4 * self._CAM_FRAME_TIME |
| cam_times = np.array(range(pts[0], pts[9], self._CAM_FRAME_TIME)) |
| cam_rots = self._generate_pwl_waveform( |
| pts, self._CAM_FRAME_TIME, self._CAM_ROT_AMPLITUDE) |
| cam_rots.pop() # rots is N-1 for N length times. |
| cam_rots = np.array(cam_rots) |
| |
| # Generate gyro waveform. |
| gyro_step = int(round(_SEC_TO_NSEC/gyro_sampling_rate, 0)) |
| gyro_pts = {k: v+t_offset+self._CAM_FRAME_TIME//2 for k, v in pts.items()} |
| gyro_pts[0] = 0 # adjust end pts to bound camera |
| gyro_pts[9] += self._CAM_FRAME_TIME*2 # adjust end pt to bound camera |
| gyro_rot_amplitude = ( |
| self._CAM_ROT_AMPLITUDE / self._CAM_FRAME_TIME * _SEC_TO_NSEC) |
| gyro_rots = self._generate_pwl_waveform( |
| gyro_pts, gyro_step, gyro_rot_amplitude) |
| |
| # Create gyro events list of dicts. |
| gyro_events = [] |
| for i, t in enumerate(range(gyro_pts[0], gyro_pts[9], gyro_step)): |
| gyro_events.append({'time': t, 'z': gyro_rots[i]}) |
| |
| return cam_times, cam_rots, gyro_events |
| |
| def test_get_gyro_rotations(self): |
| """Tests that gyro rotations are masked properly by camera rotations. |
| |
| Note that waveform ideal waveform generation only works properly with |
| integer multiples of frame rate. |
| """ |
| # Run with different sampling rates to validate. |
| for gyro_sampling_rate in [200, 1000]: # 6x, 30x frame rate |
| cam_times, cam_rots, gyro_events = self._generate_test_waveforms( |
| gyro_sampling_rate) |
| gyro_rots = get_gyro_rotations(gyro_events, cam_times) |
| e_msg = f'gyro sampling rate = {gyro_sampling_rate}\n' |
| e_msg += f'cam_times = {list(cam_times)}\n' |
| e_msg += f'cam_rots = {list(cam_rots)}\n' |
| e_msg += f'gyro_rots = {list(gyro_rots)}' |
| |
| self.assertTrue(np.allclose( |
| gyro_rots, cam_rots, atol=self._CAM_ROT_AMPLITUDE*0.10), e_msg) |
| |
| def test_get_best_alignment_offset(self): |
| """Unittest for alignment offset check.""" |
| |
| gyro_sampling_rate = 5000 |
| for t_offset_ms in [0, 1]: # Run with different offsets to validate. |
| t_offset = int(t_offset_ms * _MSEC_TO_NSEC) |
| cam_times, cam_rots, gyro_events = self._generate_test_waveforms( |
| gyro_sampling_rate, t_offset) |
| |
| best_fit_offset, coeffs, x, y = get_best_alignment_offset( |
| cam_times, cam_rots, gyro_events) |
| e_msg = f'best: {best_fit_offset} ms\n' |
| e_msg += f'coeffs: {coeffs}\n' |
| e_msg += f'x: {x}\n' |
| e_msg += f'y: {y}' |
| self.assertTrue(np.isclose(t_offset_ms, best_fit_offset, atol=0.1), e_msg) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |
| |