| """Experimentally determines a camera's rolling shutter skew. |
| |
| See the accompanying PDF for instructions on how to use this test. |
| """ |
| from __future__ import division |
| from __future__ import print_function |
| |
| import argparse |
| import glob |
| import math |
| import os |
| import sys |
| import tempfile |
| |
| import cv2 |
| import its.caps |
| import its.device |
| import its.image |
| import its.objects |
| import numpy as np |
| |
| DEBUG = False |
| |
| # Constants for which direction the camera is facing. |
| FACING_FRONT = 0 |
| FACING_BACK = 1 |
| FACING_EXTERNAL = 2 |
| |
| # Camera capture defaults. |
| FPS = 30 |
| WIDTH = 640 |
| HEIGHT = 480 |
| TEST_LENGTH = 1 |
| |
| # Each circle in a cluster must be within this many pixels of some other circle |
| # in the cluster. |
| CLUSTER_DISTANCE = 50.0 / HEIGHT |
| # A cluster must consist of at least this percentage of the total contours for |
| # it to be allowed into the computation. |
| MAJORITY_THRESHOLD = 0.7 |
| |
| # Constants to make sure the slope of the fitted line is reasonable. |
| SLOPE_MIN_THRESHOLD = 0.5 |
| SLOPE_MAX_THRESHOLD = 1.5 |
| |
| # To improve readability of unit conversions. |
| SEC_TO_NSEC = float(10**9) |
| MSEC_TO_NSEC = float(10**6) |
| NSEC_TO_MSEC = 1.0 / float(10**6) |
| |
| |
| class RollingShutterArgumentParser(object): |
| """Parses command line arguments for the rolling shutter test.""" |
| |
| def __init__(self): |
| self.__parser = argparse.ArgumentParser( |
| description='Run rolling shutter test') |
| self.__parser.add_argument( |
| '-d', '--debug', |
| action='store_true', |
| help='print and write data useful for debugging') |
| self.__parser.add_argument( |
| '-f', '--fps', |
| type=int, |
| help='FPS to capture with during the test (defaults to 30)') |
| self.__parser.add_argument( |
| '-i', '--img_size', |
| help=('comma-separated dimensions of captured images (defaults ' |
| 'to 640x480). Example: --img_size=<width>,<height>')) |
| self.__parser.add_argument( |
| '-l', '--led_time', |
| type=float, |
| required=True, |
| help=('how many milliseconds each column of the LED array is ' |
| 'lit for')) |
| self.__parser.add_argument( |
| '-p', '--panel_distance', |
| type=float, |
| help='how far the LED panel is from the camera (in meters)') |
| self.__parser.add_argument( |
| '-r', '--read_dir', |
| help=('read existing test data from specified directory. If ' |
| 'not specified, new test data is collected from the ' |
| 'device\'s camera)')) |
| self.__parser.add_argument( |
| '--device_id', |
| help=('device ID for device being tested (can also use ' |
| '\'device=<DEVICE ID>\')')) |
| self.__parser.add_argument( |
| '-t', '--test_length', |
| type=int, |
| help=('how many seconds the test should run for (defaults to 1 ' |
| 'second)')) |
| self.__parser.add_argument( |
| '-o', '--debug_dir', |
| help=('write debugging information in a folder in the ' |
| 'specified directory. Otherwise, the system\'s default ' |
| 'location for temporary folders is used. --debug must ' |
| 'be specified along with this argument.')) |
| |
| def parse_args(self): |
| """Returns object containing parsed values from the command line.""" |
| # Don't show argparse the 'device' flag, since it's in a different |
| # format than the others (to maintain CameraITS conventions) and it will |
| # complain. |
| filtered_args = [arg for arg in sys.argv[1:] if 'device=' not in arg] |
| args = self.__parser.parse_args(filtered_args) |
| if args.device_id: |
| # If argparse format is used, convert it to a format its.device can |
| # use later on. |
| sys.argv.append('device=%s' % args.device_id) |
| return args |
| |
| |
| def main(): |
| global DEBUG |
| global CLUSTER_DISTANCE |
| |
| parser = RollingShutterArgumentParser() |
| args = parser.parse_args() |
| |
| DEBUG = args.debug |
| if not DEBUG and args.debug_dir: |
| print('argument --debug_dir requires --debug', file=sys.stderr) |
| sys.exit() |
| |
| if args.read_dir is None: |
| # Collect new data. |
| raw_caps, reported_skew = collect_data(args) |
| frames = [its.image.convert_capture_to_rgb_image(c) for c in raw_caps] |
| else: |
| # Load existing data. |
| frames, reported_skew = load_data(args.read_dir) |
| |
| # Make the cluster distance relative to the height of the image. |
| (frame_h, _, _) = frames[0].shape |
| CLUSTER_DISTANCE = frame_h * CLUSTER_DISTANCE |
| debug_print('Setting cluster distance to %spx.' % CLUSTER_DISTANCE) |
| |
| if DEBUG: |
| debug_dir = setup_debug_dir(args.debug_dir) |
| # Write raw frames. |
| for i, img in enumerate(frames): |
| its.image.write_image(img, '%s/raw/%03d.png' % (debug_dir, i)) |
| else: |
| debug_dir = None |
| |
| avg_shutter_skew, num_frames_used = find_average_shutter_skew( |
| frames, args.led_time, debug_dir) |
| if debug_dir: |
| # Write the reported skew with the raw images, so the directory can also |
| # be used to read from. |
| with open(debug_dir + '/raw/reported_skew.txt', 'w') as f: |
| f.write('%sms\n' % reported_skew) |
| |
| if avg_shutter_skew is None: |
| print('Could not find usable frames.') |
| else: |
| print('Device reported shutter skew of %sms.' % reported_skew) |
| print('Measured shutter skew is %sms (averaged over %s frames).' % |
| (avg_shutter_skew, num_frames_used)) |
| |
| |
| def collect_data(args): |
| """Capture a new set of frames from the device's camera. |
| |
| Args: |
| args: Parsed command line arguments. |
| |
| Returns: |
| A list of RGB images as numpy arrays. |
| """ |
| fps = args.fps if args.fps else FPS |
| if args.img_size: |
| w, h = map(int, args.img_size.split(',')) |
| else: |
| w, h = WIDTH, HEIGHT |
| test_length = args.test_length if args.test_length else TEST_LENGTH |
| |
| with its.device.ItsSession() as cam: |
| props = cam.get_camera_properties() |
| its.caps.skip_unless(its.caps.manual_sensor(props)) |
| facing = props['android.lens.facing'] |
| if facing != FACING_FRONT and facing != FACING_BACK: |
| print('Unknown lens facing %s' % facing) |
| assert 0 |
| |
| fmt = {'format': 'yuv', 'width': w, 'height': h} |
| s, e, _, _, _ = cam.do_3a(get_results=True, do_af=False) |
| req = its.objects.manual_capture_request(s, e) |
| req['android.control.aeTargetFpsRange'] = [fps, fps] |
| |
| # Convert from milliseconds to nanoseconds. We only want enough |
| # exposure time to saturate approximately one column. |
| exposure_time = (args.led_time / 2.0) * MSEC_TO_NSEC |
| print('Using exposure time of %sns.' % exposure_time) |
| req['android.sensor.exposureTime'] = exposure_time |
| req["android.sensor.frameDuration"] = int(SEC_TO_NSEC / fps); |
| |
| if args.panel_distance is not None: |
| # Convert meters to diopters and use that for the focus distance. |
| req['android.lens.focusDistance'] = 1 / args.panel_distance |
| print('Starting capture') |
| raw_caps = cam.do_capture([req]*fps*test_length, fmt) |
| print('Finished capture') |
| |
| # Convert from nanoseconds to milliseconds. |
| shutter_skews = {c['metadata']['android.sensor.rollingShutterSkew'] * |
| NSEC_TO_MSEC for c in raw_caps} |
| # All frames should have same rolling shutter skew. |
| assert len(shutter_skews) == 1 |
| shutter_skew = list(shutter_skews)[0] |
| |
| return raw_caps, shutter_skew |
| |
| |
| def load_data(dir_name): |
| """Reads camera frame data from an existing directory. |
| |
| Args: |
| dir_name: Name of the directory to read data from. |
| |
| Returns: |
| A list of RGB images as numpy arrays. |
| """ |
| frame_files = glob.glob('%s/*.png' % dir_name) |
| frames = [] |
| for frame_file in sorted(frame_files): |
| frames.append(its.image.load_rgb_image(frame_file)) |
| with open('%s/reported_skew.txt' % dir_name, 'r') as f: |
| reported_skew = f.readline()[:-2] # Strip off 'ms' suffix |
| return frames, reported_skew |
| |
| |
| def find_average_shutter_skew(frames, led_time, debug_dir=None): |
| """Finds the average shutter skew using the given frames. |
| |
| Frames without enough information will be discarded from the average to |
| improve overall accuracy. |
| |
| Args: |
| frames: List of RGB images from the camera being tested. |
| led_time: How long a single LED column is lit for (in milliseconds). |
| debug_dir: (optional) Directory to write debugging information to. |
| |
| Returns: |
| The average calculated shutter skew and the number of frames used to |
| calculate the average. |
| """ |
| avg_shutter_skew = 0.0 |
| avg_slope = 0.0 |
| weight = 0.0 |
| num_frames_used = 0 |
| |
| for i, frame in enumerate(frames): |
| debug_print('------------------------') |
| debug_print('| PROCESSING FRAME %03d |' % i) |
| debug_print('------------------------') |
| shutter_skew, confidence, slope = calculate_shutter_skew( |
| frame, led_time, i, debug_dir=debug_dir) |
| if shutter_skew is None: |
| debug_print('Skipped frame.') |
| else: |
| debug_print('Shutter skew is %sms (confidence: %s).' % |
| (shutter_skew, confidence)) |
| # Use the confidence to weight the average. |
| avg_shutter_skew += shutter_skew * confidence |
| avg_slope += slope * confidence |
| weight += confidence |
| num_frames_used += 1 |
| |
| debug_print('\n') |
| if num_frames_used == 0: |
| return None, None |
| else: |
| avg_shutter_skew /= weight |
| avg_slope /= weight |
| slope_err_str = ('The average slope of the fitted line was too %s ' |
| 'to get an accurate measurement (slope was %s). ' |
| 'Try making the LED panel %s.') |
| if avg_slope < SLOPE_MIN_THRESHOLD: |
| print(slope_err_str % ('flat', avg_slope, 'slower'), |
| file=sys.stderr) |
| elif avg_slope > SLOPE_MAX_THRESHOLD: |
| print(slope_err_str % ('steep', avg_slope, 'faster'), |
| file=sys.stderr) |
| return avg_shutter_skew, num_frames_used |
| |
| |
| def calculate_shutter_skew(frame, led_time, frame_num=None, debug_dir=None): |
| """Calculates the shutter skew of the camera being used for this test. |
| |
| Args: |
| frame: A single RGB image captured by the camera being tested. |
| led_time: How long a single LED column is lit for (in milliseconds). |
| frame_num: (optional) Number of the given frame. |
| debug_dir: (optional) Directory to write debugging information to. |
| |
| Returns: |
| The shutter skew (in milliseconds), the confidence in the accuracy of |
| the measurement (useful for weighting averages), and the slope of the |
| fitted line. |
| """ |
| contours, scratch_img, contour_img, mono_img = find_contours(frame.copy()) |
| if debug_dir is not None: |
| cv2.imwrite('%s/contour/%03d.png' % (debug_dir, frame_num), contour_img) |
| cv2.imwrite('%s/mono/%03d.png' % (debug_dir, frame_num), mono_img) |
| |
| largest_cluster, cluster_percentage = find_largest_cluster(contours, |
| scratch_img) |
| if largest_cluster is None: |
| debug_print('No majority cluster found.') |
| return None, None, None |
| elif len(largest_cluster) <= 1: |
| debug_print('Majority cluster was too small.') |
| return None, None, None |
| debug_print('%s points in the largest cluster.' % len(largest_cluster)) |
| |
| np_cluster = np.array([[c.x, c.y] for c in largest_cluster]) |
| [vx], [vy], [x0], [y0] = cv2.fitLine(np_cluster, cv2.cv.CV_DIST_L2, |
| 0, 0.01, 0.01) |
| slope = vy / vx |
| debug_print('Slope is %s.' % slope) |
| (frame_h, frame_w, _) = frame.shape |
| # Draw line onto scratch frame. |
| pt1 = tuple(map(int, (x0 - vx * 1000, y0 - vy * 1000))) |
| pt2 = tuple(map(int, (x0 + vx * 1000, y0 + vy * 1000))) |
| cv2.line(scratch_img, pt1, pt2, (0, 255, 255), thickness=3) |
| |
| # We only need the width of the cluster. |
| _, _, cluster_w, _ = find_cluster_bounding_rect(largest_cluster, |
| scratch_img) |
| |
| num_columns = find_num_columns_spanned(largest_cluster) |
| debug_print('%s columns spanned by cluster.' % num_columns) |
| # How long it takes for a column to move from the left of the bounding |
| # rectangle to the right. |
| left_to_right_time = led_time * num_columns |
| milliseconds_per_x_pixel = left_to_right_time / cluster_w |
| # The distance between the line's intersection at the top of the frame and |
| # the intersection at the bottom. |
| x_range = frame_h / slope |
| shutter_skew = milliseconds_per_x_pixel * x_range |
| # If the aspect ratio is different from 4:3 (the aspect ratio of the actual |
| # sensor), we need to correct, because it will be cropped. |
| shutter_skew *= (float(frame_w) / float(frame_h)) / (4.0 / 3.0) |
| |
| if debug_dir is not None: |
| cv2.imwrite('%s/scratch/%03d.png' % (debug_dir, frame_num), |
| scratch_img) |
| |
| return shutter_skew, cluster_percentage, slope |
| |
| |
| def find_contours(img): |
| """Finds contours in the given image. |
| |
| Args: |
| img: Image in Android camera RGB format. |
| |
| Returns: |
| OpenCV-formatted contours, the original image in OpenCV format, a |
| thresholded image with the contours drawn on, and a grayscale version of |
| the image. |
| """ |
| # Convert to format OpenCV can work with (BGR ordering with byte-ranged |
| # values). |
| img *= 255 |
| img = img.astype(np.uint8) |
| img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) |
| |
| # Since the LED colors for the panel we're using are red, we can get better |
| # contours for the LEDs if we ignore the green and blue channels. This also |
| # makes it so we don't pick up the blue control screen of the LED panel. |
| red_img = img[:, :, 2] |
| _, thresh = cv2.threshold(red_img, 0, 255, cv2.THRESH_BINARY + |
| cv2.THRESH_OTSU) |
| |
| # Remove noise before finding contours by eroding the thresholded image and |
| # then re-dilating it. The size of the kernel represents how many |
| # neighboring pixels to consider for the result of a single pixel. |
| kernel = np.ones((3, 3), np.uint8) |
| opening = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel, iterations=2) |
| |
| if DEBUG: |
| # Need to convert it back to BGR if we want to draw colored contours. |
| contour_img = cv2.cvtColor(opening, cv2.COLOR_GRAY2BGR) |
| else: |
| contour_img = None |
| contours, _ = cv2.findContours(opening, |
| cv2.cv.CV_RETR_EXTERNAL, |
| cv2.cv.CV_CHAIN_APPROX_NONE) |
| if DEBUG: |
| cv2.drawContours(contour_img, contours, -1, (0, 0, 255), thickness=2) |
| return contours, img, contour_img, red_img |
| |
| |
| def convert_to_circles(contours): |
| """Converts given contours into circle objects. |
| |
| Args: |
| contours: Contours generated by OpenCV. |
| |
| Returns: |
| A list of circles. |
| """ |
| |
| class Circle(object): |
| """Holds data to uniquely define a circle.""" |
| |
| def __init__(self, contour): |
| self.x = int(np.mean(contour[:, 0, 0])) |
| self.y = int(np.mean(contour[:, 0, 1])) |
| # Get diameters of each axis then half it. |
| x_r = (np.max(contour[:, 0, 0]) - np.min(contour[:, 0, 0])) / 2.0 |
| y_r = (np.max(contour[:, 0, 1]) - np.min(contour[:, 0, 1])) / 2.0 |
| # Average x radius and y radius to get the approximate radius for |
| # the given contour. |
| self.r = (x_r + y_r) / 2.0 |
| assert self.r > 0.0 |
| |
| def distance_to(self, other): |
| return (math.sqrt((other.x - self.x)**2 + (other.y - self.y)**2) - |
| self.r - other.r) |
| |
| def intersects(self, other): |
| return self.distance_to(other) <= 0.0 |
| |
| return list(map(Circle, contours)) |
| |
| |
| def find_largest_cluster(contours, frame): |
| """Finds the largest cluster in the given contours. |
| |
| Args: |
| contours: Contours generated by OpenCV. |
| frame: For drawing debugging information onto. |
| |
| Returns: |
| The cluster with the most contours in it and the percentage of all |
| contours that the cluster contains. |
| """ |
| clusters = proximity_clusters(contours) |
| |
| if not clusters: |
| return None, None # No clusters found. |
| |
| largest_cluster = max(clusters, key=len) |
| cluster_percentage = len(largest_cluster) / len(contours) |
| |
| if cluster_percentage < MAJORITY_THRESHOLD: |
| return None, None |
| |
| if DEBUG: |
| # Draw largest cluster on scratch frame. |
| for circle in largest_cluster: |
| cv2.circle(frame, (int(circle.x), int(circle.y)), int(circle.r), |
| (0, 255, 0), thickness=2) |
| |
| return largest_cluster, cluster_percentage |
| |
| |
| def proximity_clusters(contours): |
| """Sorts the given contours into groups by distance. |
| |
| Converts every given contour to a circle and clusters by adding a circle to |
| a cluster only if it is close to at least one other circle in the cluster. |
| |
| TODO: Make algorithm faster (currently O(n**2)). |
| |
| Args: |
| contours: Contours generated by OpenCV. |
| |
| Returns: |
| A list of clusters, where each cluster is a list of the circles |
| contained in the cluster. |
| """ |
| circles = convert_to_circles(contours) |
| |
| # Use disjoint-set data structure to store assignments. Start every point |
| # in their own cluster. |
| cluster_assignments = [-1 for i in range(len(circles))] |
| |
| def get_canonical_index(i): |
| if cluster_assignments[i] >= 0: |
| index = get_canonical_index(cluster_assignments[i]) |
| # Collapse tree for better runtime. |
| cluster_assignments[i] = index |
| return index |
| else: |
| return i |
| |
| def get_cluster_size(i): |
| return -cluster_assignments[get_canonical_index(i)] |
| |
| for i, curr in enumerate(circles): |
| close_circles = [j for j, p in enumerate(circles) if i != j and |
| curr.distance_to(p) < CLUSTER_DISTANCE] |
| if close_circles: |
| # Note: largest_cluster is an index into cluster_assignments. |
| largest_cluster = min(close_circles, key=get_cluster_size) |
| largest_size = get_cluster_size(largest_cluster) |
| curr_index = get_canonical_index(i) |
| curr_size = get_cluster_size(i) |
| if largest_size > curr_size: |
| # largest_cluster is larger than us. |
| target_index = get_canonical_index(largest_cluster) |
| # Add our cluster size to the bigger one. |
| cluster_assignments[target_index] -= curr_size |
| # Reroute our group to the bigger one. |
| cluster_assignments[curr_index] = target_index |
| else: |
| # We're the largest (or equal to the largest) cluster. Reroute |
| # all groups to us. |
| for j in close_circles: |
| smaller_size = get_cluster_size(j) |
| smaller_index = get_canonical_index(j) |
| if smaller_index != curr_index: |
| # We only want to modify clusters that aren't already in |
| # the current one. |
| |
| # Add the smaller cluster's size to ours. |
| cluster_assignments[curr_index] -= smaller_size |
| # Reroute their group to us. |
| cluster_assignments[smaller_index] = curr_index |
| |
| # Convert assignments list into list of clusters. |
| clusters_dict = {} |
| for i in range(len(cluster_assignments)): |
| canonical_index = get_canonical_index(i) |
| if canonical_index not in clusters_dict: |
| clusters_dict[canonical_index] = [] |
| clusters_dict[canonical_index].append(circles[i]) |
| return clusters_dict.values() |
| |
| |
| def find_cluster_bounding_rect(cluster, scratch_frame): |
| """Finds the minimum rectangle that bounds the given cluster. |
| |
| The bounding rectangle will always be axis-aligned. |
| |
| Args: |
| cluster: Cluster being used to find the bounding rectangle. |
| scratch_frame: Image that rectangle is drawn onto for debugging |
| purposes. |
| |
| Returns: |
| The leftmost and topmost x and y coordinates, respectively, along with |
| the width and height of the rectangle. |
| """ |
| avg_distance = find_average_neighbor_distance(cluster) |
| debug_print('Average distance between points in largest cluster is %s ' |
| 'pixels.' % avg_distance) |
| |
| c_x = min(cluster, key=lambda c: c.x - c.r) |
| c_y = min(cluster, key=lambda c: c.y - c.r) |
| c_w = max(cluster, key=lambda c: c.x + c.r) |
| c_h = max(cluster, key=lambda c: c.y + c.r) |
| |
| x = c_x.x - c_x.r - avg_distance |
| y = c_y.y - c_y.r - avg_distance |
| w = (c_w.x + c_w.r + avg_distance) - x |
| h = (c_h.y + c_h.r + avg_distance) - y |
| |
| if DEBUG: |
| points = np.array([[x, y], [x + w, y], [x + w, y + h], [x, y + h]], |
| np.int32) |
| cv2.polylines(scratch_frame, [points], True, (255, 0, 0), thickness=2) |
| |
| return x, y, w, h |
| |
| |
| def find_average_neighbor_distance(cluster): |
| """Finds the average distance between every circle and its closest neighbor. |
| |
| Args: |
| cluster: List of circles |
| |
| Returns: |
| The average distance. |
| """ |
| avg_distance = 0.0 |
| for a in cluster: |
| closest_point = None |
| closest_dist = None |
| for b in cluster: |
| if a is b: |
| continue |
| curr_dist = a.distance_to(b) |
| if closest_point is None or curr_dist < closest_dist: |
| closest_point = b |
| closest_dist = curr_dist |
| avg_distance += closest_dist |
| avg_distance /= len(cluster) |
| return avg_distance |
| |
| |
| def find_num_columns_spanned(circles): |
| """Finds how many columns of the LED panel are spanned by the given circles. |
| |
| Args: |
| circles: List of circles (assumed to be from the LED panel). |
| |
| Returns: |
| The number of columns spanned. |
| """ |
| if not circles: |
| return 0 |
| |
| def x_intersects(c_a, c_b): |
| return abs(c_a.x - c_b.x) < (c_a.r + c_b.r) |
| |
| circles = sorted(circles, key=lambda c: c.x) |
| last_circle = circles[0] |
| num_columns = 1 |
| for circle in circles[1:]: |
| if not x_intersects(circle, last_circle): |
| last_circle = circle |
| num_columns += 1 |
| |
| return num_columns |
| |
| |
| def setup_debug_dir(dir_name=None): |
| """Creates a debug directory and required subdirectories. |
| |
| Each subdirectory contains images from a different step in the process. |
| |
| Args: |
| dir_name: The directory to create. If none is specified, a temp |
| directory is created. |
| |
| Returns: |
| The name of the directory that is used. |
| """ |
| if dir_name is None: |
| dir_name = tempfile.mkdtemp() |
| else: |
| force_mkdir(dir_name) |
| print('Saving debugging files to "%s"' % dir_name) |
| # For original captured images. |
| force_mkdir(dir_name + '/raw', clean=True) |
| # For monochrome images. |
| force_mkdir(dir_name + '/mono', clean=True) |
| # For contours generated from monochrome images. |
| force_mkdir(dir_name + '/contour', clean=True) |
| # For post-contour debugging information. |
| force_mkdir(dir_name + '/scratch', clean=True) |
| return dir_name |
| |
| |
| def force_mkdir(dir_name, clean=False): |
| """Creates a directory if it doesn't already exist. |
| |
| Args: |
| dir_name: Name of the directory to create. |
| clean: (optional) If set to true, cleans image files from the |
| directory (if it already exists). |
| """ |
| if os.path.exists(dir_name): |
| if clean: |
| for image in glob.glob('%s/*.png' % dir_name): |
| os.remove(image) |
| else: |
| os.makedirs(dir_name) |
| |
| |
| def debug_print(s, *args, **kwargs): |
| """Only prints if the test is running in debug mode.""" |
| if DEBUG: |
| print(s, *args, **kwargs) |
| |
| |
| if __name__ == '__main__': |
| main() |