ITS: add --replay feature to sensor_fusion
bug: 181365679
Change-Id: I6a472a2700bfe6586b61d84bd3664399a6edf669
diff --git a/apps/CameraITS/tests/sensor_fusion/test_sensor_fusion.py b/apps/CameraITS/tests/sensor_fusion/test_sensor_fusion.py
index d6dded3..44543dc 100644
--- a/apps/CameraITS/tests/sensor_fusion/test_sensor_fusion.py
+++ b/apps/CameraITS/tests/sensor_fusion/test_sensor_fusion.py
@@ -19,6 +19,7 @@
import math
import multiprocessing
import os
+import sys
import time
from matplotlib import pylab
@@ -366,6 +367,26 @@
'%s_plot_rotations.png' % os.path.join(log_path, _NAME))
+def load_data():
+ """Load a set of previously captured data.
+
+ Returns:
+ events: Dictionary containing all gyro events and cam timestamps.
+ frames: List of RGB images as numpy arrays.
+ w: Pixel width of frames
+ h: Pixel height of frames
+ """
+ with open(f'{_NAME}_events.txt', 'r') as f:
+ events = json.loads(f.read())
+ n = len(events['cam'])
+ frames = []
+ for i in range(n):
+ img = image_processing_utils.read_image(f'{_NAME}_frame{i:03d}.png')
+ w, h = img.size[0:2]
+ frames.append(np.array(img).reshape((h, w, 3)) / 255)
+ return events, frames, w, h
+
+
class SensorFusionTest(its_base_test.ItsBaseTest):
"""Tests if image and motion sensor events are well synchronized.
@@ -414,6 +435,13 @@
raise AssertionError(f'Gyro sample rate, {gyro_smp_per_sec}S/s, low!')
def test_sensor_fusion(self):
+ # Determine if '--replay' in cmd line
+ replay = False
+ for s in list(sys.argv[1:]):
+ if '--replay' in s:
+ replay = True
+ logging.info('test_sensor_fusion.py not run. Using existing data set.')
+
rot_rig = {}
fps = float(self.fps)
img_w, img_h = self.img_w, self.img_h
@@ -427,15 +455,18 @@
' to run smoothly. If you run into problems, consider'
" smaller values of 'w', 'h', 'fps', or 'test_length'.")
- with its_session_utils.ItsSession(
- device_id=self.dut.serial,
- camera_id=self.camera_id,
- hidden_physical_id=self.hidden_physical_id) as cam:
+ if replay:
+ events, frames, _, h = load_data()
+ else:
+ with its_session_utils.ItsSession(
+ device_id=self.dut.serial,
+ camera_id=self.camera_id,
+ hidden_physical_id=self.hidden_physical_id) as cam:
- rot_rig['cntl'] = self.rotator_cntl
- rot_rig['ch'] = self.rotator_ch
- events, frames = _collect_data(cam, fps, img_w, img_h, test_length,
- rot_rig, chart_distance, log_path)
+ rot_rig['cntl'] = self.rotator_cntl
+ rot_rig['ch'] = self.rotator_ch
+ events, frames = _collect_data(cam, fps, img_w, img_h, test_length,
+ rot_rig, chart_distance, log_path)
_plot_gyro_events(events['gyro'], log_path)
@@ -463,8 +494,8 @@
# Assert PASS/FAIL criteria.
if corr_dist > _CORR_DIST_THRESH_MAX:
- raise AssertionError(f'Poor gyro/camera correlation. '
- f'Corr: {corr_dist}, TOL: {_CORR_DIST_THRESH_MAX}.')
+ raise AssertionError(f'Poor gyro/camera correlation: {corr_dist:.6f}, '
+ f'TOL: {_CORR_DIST_THRESH_MAX}.')
if abs(offset_ms) > _OFFSET_MS_THRESH_MAX:
raise AssertionError('Offset too large. Measured (ms): '
f'{offset_ms:.3f}, TOL: {_OFFSET_MS_THRESH_MAX}.')
diff --git a/apps/CameraITS/utils/image_processing_utils.py b/apps/CameraITS/utils/image_processing_utils.py
index dd7ea4d..c041e24 100644
--- a/apps/CameraITS/utils/image_processing_utils.py
+++ b/apps/CameraITS/utils/image_processing_utils.py
@@ -446,6 +446,11 @@
raise error_util.CameraItsError('Unsupported image type')
+def read_image(fname):
+ """Read image function to match write_image() above."""
+ return Image.open(fname)
+
+
def apply_lut_to_image(img, lut):
"""Applies a LUT to every pixel in a float image array.