blob: 5c046f0dd87b511221a89de8472c854ec4a306bc [file]
# Copyright 2024 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions for verifying preview stabilization.
"""
import cv2
import logging
import os
import threading
import time
import camera_properties_utils
import gen2_rig_controller_utils
import its_session_utils
import sensor_fusion_utils
import video_processing_utils
_AREA_720P_VIDEO = 1280 * 720
_ASPECT_RATIO_16_9 = 16/9 # determine if preview fmt > 16:9
_ASPECT_TOL = 0.01
_GREEN_TOL = 200 # 200 out of 255 Green value in RGB
_GREEN_PERCENT = 95
_HIGH_RES_SIZE = '3840x2160' # Resolution for 4K quality
_IMG_FORMAT = 'png'
_MIN_PHONE_MOVEMENT_ANGLE = 5 # degrees
_NUM_ROTATIONS = 24
_PREVIEW_DURATION = 400 # milliseconds
_PREVIEW_MAX_TESTED_AREA = 1920 * 1440
_PREVIEW_MIN_TESTED_AREA = 320 * 240
_PREVIEW_STABILIZATION_FACTOR = 0.7 # 70% of gyro movement allowed
_RED_BLUE_TOL = 20 # 20 out of 255 Red or Blue value in RGB
# TODO: b/431221553 - see if _SKIP_INITIAL_FRAMES can be removed
_SKIP_INITIAL_FRAMES = 10
_START_FRAME = 30 # give 3A some frames to warm up
_GYRO_DELAY_TIME = 2.0 # seconds
_VIDEO_DELAY_TIME = 5.5 # seconds
_VIDEO_DURATION = 5.5 # seconds
def get_720p_or_above_size(supported_preview_sizes):
"""Returns the smallest size above or equal to 720p in preview and video.
If the largest preview size is under 720P, returns the largest value.
Args:
supported_preview_sizes: list; preview sizes.
e.g. ['1920x960', '1600x1200', '1920x1080']
Returns:
smallest size >= 720p video format
"""
size_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1])
smallest_area = float('inf')
smallest_720p_or_above_size = ''
largest_supported_preview_size = ''
largest_area = 0
for size in supported_preview_sizes:
area = size_to_area(size)
if smallest_area > area >= _AREA_720P_VIDEO:
smallest_area = area
smallest_720p_or_above_size = size
else:
if area > largest_area:
largest_area = area
largest_supported_preview_size = size
if largest_area > _AREA_720P_VIDEO:
logging.debug('Smallest 720p or above size: %s',
smallest_720p_or_above_size)
return smallest_720p_or_above_size
else:
logging.debug('Largest supported preview size: %s',
largest_supported_preview_size)
return largest_supported_preview_size
def collect_data(cam, tablet_device, preview_size, stabilize, rot_rig,
zoom_ratio=None, fps_range=None, hlg10=False, ois=False):
"""Capture a new set of data from the device.
Captures camera preview frames while the user is moving the device in
the prescribed manner.
Args:
cam: camera object.
tablet_device: boolean; based on config file.
preview_size: str; preview stream resolution. ex. '1920x1080'
stabilize: boolean; whether preview stabilization is ON.
rot_rig: dict with 'cntl' and 'ch' defined.
zoom_ratio: float; static zoom ratio. None if default zoom.
fps_range: list; target fps range.
hlg10: boolean; whether to capture hlg10 output.
ois: boolean; whether optical image stabilization is ON.
Returns:
recording object; a dictionary containing output path, video size, etc.
"""
output_surfaces = cam.preview_surface(preview_size, hlg10)
video_stream_index = 0
stabilize_mode = camera_properties_utils.STABILIZATION_MODE_OFF
if stabilize:
stabilize_mode = camera_properties_utils.STABILIZATION_MODE_PREVIEW
return collect_data_with_surfaces(cam, tablet_device, output_surfaces,
video_stream_index, stabilize_mode, rot_rig,
zoom_ratio, fps_range, ois)
def collect_data_with_surfaces(cam, tablet_device, output_surfaces,
video_stream_index, stabilize_mode, rot_rig,
zoom_ratio=None, fps_range=None, ois=False):
"""Capture a new set of data from the device.
Captures camera preview frames while the user is moving the device in
the prescribed manner.
Args:
cam: camera object.
tablet_device: boolean; based on config file.
output_surfaces: list of dict; The list of output surfaces configured for
the recording. Only the first surface is used for recording; the rest are
configured, but not requested.
video_stream_index: The index of output surface used for recording
stabilize_mode: int; Video stabilization mode.
rot_rig: dict with 'cntl' and 'ch' defined.
zoom_ratio: float; static zoom ratio. None if default zoom.
fps_range: list; target fps range.
ois: boolean; whether optical image stabilization is ON.
Returns:
recording object; a dictionary containing output path, video size, etc.
"""
logging.debug('Starting sensor event collection')
# Start camera vibration
if rot_rig['cntl'] == 'gen2_rotator':
logging.debug('using gen2_rotator')
p = threading.Thread(
target=gen2_rig_controller_utils.rotation_rig,
args=(
rot_rig['cntl'],
rot_rig['ch'],
_NUM_ROTATIONS,
sensor_fusion_utils.ARDUINO_ANGLES_STABILIZATION,
),
kwargs={'port': rot_rig.get('port')}
)
else:
logging.debug('using sensor_fusion rotator')
if tablet_device:
servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION_TABLET
else:
servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION
p = threading.Thread(
target=sensor_fusion_utils.rotation_rig,
args=(
rot_rig['cntl'],
rot_rig['ch'],
_NUM_ROTATIONS,
sensor_fusion_utils.ARDUINO_ANGLES_STABILIZATION,
servo_speed,
sensor_fusion_utils.ARDUINO_MOVE_TIME_STABILIZATION,
),
)
p.start()
# Allow time for rig to move to starting position
time.sleep(_GYRO_DELAY_TIME)
cam.start_sensor_events()
# Allow time for rig to start moving
time.sleep(_VIDEO_DELAY_TIME - _GYRO_DELAY_TIME)
# Record video and return recording object
min_fps = fps_range[0] if (fps_range is not None) else None
max_fps = fps_range[1] if (fps_range is not None) else None
recording_obj = cam.do_preview_recording_multiple_surfaces(
output_surfaces, video_stream_index, _VIDEO_DURATION, stabilize_mode, ois,
zoom_ratio=zoom_ratio, ae_target_fps_min=min_fps,
ae_target_fps_max=max_fps
)
logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath'])
logging.debug('Tested quality: %s', recording_obj['quality'])
# Wait for vibration to stop
p.join()
return recording_obj
def verify_stabilization(recording_obj, gyro_events, test_name,
log_path, facing, zoom_ratio=None,
stabilization_mode=True):
"""Verify the returned recording is properly stabilized.
Args:
recording_obj: Camcorder recording object.
gyro_events: Gyroscope events collected while recording.
test_name: Name of the test.
log_path: Path for the log file.
facing: Facing of the camera device.
zoom_ratio: Static zoom ratio. None if default zoom.
stabilization_mode: boolean; Whether stabilization mode is ON.
Returns:
A dictionary containing the maximum gyro angle, the maximum camera angle,
and a failure message if the recorded video isn't properly stablilized.
"""
file_name = recording_obj['recordedOutputPath'].split('/')[-1]
logging.debug('recorded file name: %s', file_name)
video_size = recording_obj['videoSize']
logging.debug('video size: %s', video_size)
# Get all frames from the video
file_list = video_processing_utils.extract_all_frames_from_video(
log_path, file_name, _IMG_FORMAT
)
logging.debug('Number of frames %d', len(file_list))
# Extract camera rotations
if zoom_ratio:
zoom_ratio_suffix = f'{zoom_ratio:.1f}'
else:
zoom_ratio_suffix = '1'
file_name_stem = (
f'{os.path.join(log_path, test_name)}_{video_size}_{zoom_ratio_suffix}x'
f'_stabilization={stabilization_mode}')
cam_rots = sensor_fusion_utils.get_cam_rotations_from_files(
file_list[_START_FRAME:],
facing,
file_name_stem,
log_path,
_START_FRAME,
stabilized_video=stabilization_mode
)
sensor_fusion_utils.plot_camera_rotations(cam_rots, _START_FRAME,
video_size, file_name_stem)
max_camera_angle = sensor_fusion_utils.calc_max_rotation_angle(
cam_rots, 'Camera')
# Extract gyro rotations
sensor_fusion_utils.plot_gyro_events(
gyro_events,
f'{test_name}_{video_size}_{zoom_ratio_suffix}x'
f'_stabilization={stabilization_mode}',
log_path
)
gyro_rots = sensor_fusion_utils.conv_acceleration_to_movement(
gyro_events, _VIDEO_DELAY_TIME)
max_gyro_angle = sensor_fusion_utils.calc_max_rotation_angle(
gyro_rots, 'Gyro')
logging.debug('Stabilization mode: %s', stabilization_mode)
logging.debug(
'Max deflection (degrees) %s: video: %.3f, gyro: %.3f ratio: %.4f',
video_size, max_camera_angle, max_gyro_angle,
max_camera_angle / max_gyro_angle)
# Assert phone is moved enough during test
if max_gyro_angle < _MIN_PHONE_MOVEMENT_ANGLE:
raise AssertionError(
f'Phone not moved enough! Movement: {max_gyro_angle}, '
f'THRESH: {_MIN_PHONE_MOVEMENT_ANGLE} degrees')
w_x_h = video_size.split('x')
if int(w_x_h[0])/int(w_x_h[1]) > _ASPECT_RATIO_16_9:
stabilization_factor = _PREVIEW_STABILIZATION_FACTOR * 1.1
else:
stabilization_factor = _PREVIEW_STABILIZATION_FACTOR
failure_msg = None
if max_camera_angle >= max_gyro_angle * stabilization_factor:
# Fail if stabilization mode is on
if stabilization_mode:
failure_msg = (
f'{video_size} not stabilized enough! '
f'Max camera angle: {max_camera_angle:.3f}, '
f'Max gyro angle: {max_gyro_angle:.3f}, '
f'ratio: {max_camera_angle/max_gyro_angle:.3f} '
f'THRESH: {stabilization_factor}.')
else:
# Fail if stabilization mode is off
if not stabilization_mode:
failure_msg = (
f'{video_size} is stabilized when testing stabilization=OFF! '
f'Max camera angle: {max_camera_angle:.3f}, '
f'Max gyro angle: {max_gyro_angle:.3f}, '
f'ratio: {max_camera_angle/max_gyro_angle:.3f} '
f'THRESH: {stabilization_factor}.')
# Delete saved frames if the format is a PASS
if not failure_msg:
for file in file_list:
try:
os.remove(os.path.join(log_path, file))
except FileNotFoundError:
logging.debug('File Not Found: %s', str(file))
logging.debug('Format %s passes, frame images removed', video_size)
return {'gyro': max_gyro_angle, 'cam': max_camera_angle,
'failure': failure_msg}
def collect_preview_data_with_zoom(cam, preview_size, zoom_start,
zoom_end, step_size, recording_duration_ms,
padded_frames=False):
"""Captures a preview video from the device.
Captures camera preview frames from the passed device.
Args:
cam: camera object.
preview_size: str; preview resolution. ex. '1920x1080'.
zoom_start: (float) is the starting zoom ratio during recording.
zoom_end: (float) is the ending zoom ratio during recording.
step_size: (float) is the step for zoom ratio during recording.
recording_duration_ms: preview recording duration in ms.
padded_frames: boolean; Whether to add additional frames at the beginning
and end of recording to workaround issue with MediaRecorder.
Returns:
recording object as described by cam.do_preview_recording_with_dynamic_zoom.
"""
recording_obj = cam.do_preview_recording_with_dynamic_zoom(
preview_size,
stabilize=False,
sweep_zoom=(zoom_start, zoom_end, step_size, recording_duration_ms),
padded_frames=padded_frames
)
logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath'])
logging.debug('Tested quality: %s', recording_obj['quality'])
return recording_obj
def is_aspect_ratio_match(size_str, target_ratio):
"""Checks if a resolution string matches the target aspect ratio."""
width, height = map(int, size_str.split('x'))
return abs(width / height - target_ratio) < _ASPECT_TOL
def get_max_preview_test_size(cam, camera_id, aspect_ratio=None,
max_tested_area=_PREVIEW_MAX_TESTED_AREA):
"""Finds the max preview size to be tested.
If the device supports the _HIGH_RES_SIZE preview size then
it uses that for testing, otherwise uses the max supported
preview size capped at max_tested_area.
Args:
cam: camera object
camera_id: str; camera device id under test
aspect_ratio: preferred aspect_ratio For example: '4/3'
max_tested_area: area of max preview resolution
Returns:
preview_test_size: str; wxh resolution of the size to be tested
"""
resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1])
supported_preview_sizes = cam.get_all_supported_preview_sizes(
camera_id, filter_recordable=True)
logging.debug('Resolutions supported by preview and MediaRecorder: %s',
supported_preview_sizes)
if aspect_ratio is None:
supported_preview_sizes = [size for size in supported_preview_sizes
if resolution_to_area(size)
>= video_processing_utils.LOWEST_RES_TESTED_AREA]
else:
supported_preview_sizes = [size for size in supported_preview_sizes
if resolution_to_area(size)
>= video_processing_utils.LOWEST_RES_TESTED_AREA
and is_aspect_ratio_match(size, aspect_ratio)]
logging.debug('Supported preview resolutions: %s', supported_preview_sizes)
if _HIGH_RES_SIZE in supported_preview_sizes:
preview_test_size = _HIGH_RES_SIZE
else:
capped_supported_preview_sizes = [
size
for size in supported_preview_sizes
if (
resolution_to_area(size) <= max_tested_area
and resolution_to_area(size) >= _PREVIEW_MIN_TESTED_AREA
)
]
logging.debug('Capped preview resolutions: %s',
capped_supported_preview_sizes)
preview_test_size = capped_supported_preview_sizes[-1]
logging.debug('Selected preview resolution: %s', preview_test_size)
return preview_test_size
def get_max_extension_preview_test_size(cam, camera_id, extension):
"""Finds the max preview size for an extension to be tested.
If the device supports the _HIGH_RES_SIZE preview size then
it uses that for testing, otherwise uses the max supported
preview size capped at _PREVIEW_MAX_TESTED_AREA.
Args:
cam: camera object
camera_id: str; camera device id under test
extension: int; camera extension mode under test
Returns:
preview_test_size: str; wxh resolution of the size to be tested
"""
resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1])
supported_preview_sizes = (
cam.get_supported_extension_preview_sizes(camera_id, extension))
supported_preview_sizes = [size for size in supported_preview_sizes
if resolution_to_area(size)
>= video_processing_utils.LOWEST_RES_TESTED_AREA]
logging.debug('Supported preview resolutions for extension %d: %s',
extension, supported_preview_sizes)
if _HIGH_RES_SIZE in supported_preview_sizes:
preview_test_size = _HIGH_RES_SIZE
else:
capped_supported_preview_sizes = [
size
for size in supported_preview_sizes
if (
resolution_to_area(size) <= _PREVIEW_MAX_TESTED_AREA
and resolution_to_area(size) >= _PREVIEW_MIN_TESTED_AREA
)
]
preview_test_size = capped_supported_preview_sizes[-1]
logging.debug('Selected preview resolution: %s', preview_test_size)
return preview_test_size
def is_image_green(image_path):
"""Checks if an image is mostly green.
Checks if an image is mostly green by ensuring green is dominant
and red/blue values are low.
Args:
image_path: str; The path to the image file.
Returns:
bool: True if mostly green, False otherwise.
"""
image = cv2.imread(image_path)
green_pixels = ((image[:, :, 1] > _GREEN_TOL) &
(image[:, :, 0] < _RED_BLUE_TOL) &
(image[:, :, 2] < _RED_BLUE_TOL)).sum()
green_percentage = (green_pixels / (image.shape[0] * image.shape[1])) * 100
if green_percentage >= _GREEN_PERCENT:
return True
else:
return False
def preview_over_zoom_range(dut, cam, preview_size, z_min, z_max, z_step_size,
log_path):
"""Captures a preview video from the device over zoom range.
Captures camera preview frames at various zoom level in zoom range.
Args:
dut: device under test
cam: camera object
preview_size: str; preview resolution. ex. '1920x1080'
z_min: minimum zoom for preview capture
z_max: maximum zoom for preview capture
z_step_size: zoom step size from min to max
log_path: str; path for video file directory
Returns:
capture_results: total capture results of each frame
file_list: file name for each frame
"""
logging.debug('z_min : %.2f, z_max = %.2f, z_step_size = %.2f',
z_min, z_max, z_step_size)
# Converge 3A
cam.do_3a()
# recording preview
# TODO: b/350821827 - encode time stamps in camera frames instead of
# padded green frams
# MediaRecorder on some devices drop last few frames. To solve this issue
# add green frames as padding at the end of recorded camera frames. This way
# green buffer frames would be droped by MediaRecorder instead of actual
# frames. Later these green padded frames are removed.
preview_rec_obj = collect_preview_data_with_zoom(
cam, preview_size, z_min, z_max, z_step_size,
_PREVIEW_DURATION, padded_frames=True)
preview_file_name = its_session_utils.pull_file_from_dut(
dut, preview_rec_obj, log_path)
logging.debug('recorded video size : %s',
str(preview_rec_obj['videoSize']))
# Extract frames as png from mp4 preview recording
file_list = video_processing_utils.extract_all_frames_from_video(
log_path, preview_file_name, _IMG_FORMAT
)
first_camera_frame_idx = 0
last_camera_frame_idx = len(file_list)
# Find index of the first-non green frame
for (idx, file_name) in enumerate(file_list):
file_path = os.path.join(log_path, file_name)
if is_image_green(file_path):
its_session_utils.remove_file(file_path)
logging.debug('Removed green file %s', file_name)
else:
logging.debug('First camera frame: %s', file_name)
first_camera_frame_idx = idx
break
# Find index of last non-green frame
for (idx, file_name) in reversed(list(enumerate(file_list))):
file_path = os.path.join(log_path, file_name)
if is_image_green(file_path):
its_session_utils.remove_file(file_path)
logging.debug('Removed green file %s', file_name)
else:
logging.debug('Last camera frame: %s', file_name)
last_camera_frame_idx = idx
break
logging.debug('start idx = %d -- end idx = %d', first_camera_frame_idx,
last_camera_frame_idx)
file_list = file_list[first_camera_frame_idx:last_camera_frame_idx+1]
# Raise error if capture result and frame count doesn't match
capture_results = preview_rec_obj['captureMetadata']
extra_capture_result_count = len(capture_results) - len(file_list)
logging.debug('Number of frames %d', len(file_list))
if extra_capture_result_count != 0:
its_session_utils.remove_frame_files(log_path)
e_msg = (f'Number of CaptureResult ({len(capture_results)}) '
f'vs number of Frames ({len(file_list)}) count mismatch.'
' Retry Test.')
raise AssertionError(e_msg)
# skip frames which might not have 3A converged
capture_results = capture_results[_SKIP_INITIAL_FRAMES:]
skipped_files = file_list[:_SKIP_INITIAL_FRAMES]
file_list = file_list[_SKIP_INITIAL_FRAMES:]
# delete skipped files
for file_name in skipped_files:
its_session_utils.remove_file(os.path.join(log_path, file_name))
return capture_results, file_list