| # Copyright 2016 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import os |
| import os.path |
| import re |
| import subprocess |
| import sys |
| import tempfile |
| import time |
| |
| import its.device |
| import numpy |
| |
| SCENE_NAME = 'sensor_fusion' |
| SKIP_RET_CODE = 101 |
| TEST_NAME = 'test_sensor_fusion' |
| TEST_DIR = os.path.join(os.getcwd(), 'tests', SCENE_NAME) |
| W, H = 640, 480 |
| |
| # For finding best correlation shifts from test output logs. |
| SHIFT_RE = re.compile('^Best correlation of [0-9.]+ at shift of [-0-9.]+ms$') |
| # For finding lines that indicate socket issues in failed test runs. |
| SOCKET_FAIL_RE = re.compile( |
| r'.*((socket\.(error|timeout))|(Problem with socket)).*') |
| |
| FPS = 30 |
| TEST_LENGTH = 7 # seconds |
| |
| |
| def main(): |
| """Run test_sensor_fusion NUM_RUNS times. |
| |
| Save intermediate files and produce a summary/report of the results. |
| |
| Script should be run from the top-level CameraITS directory. |
| |
| Command line arguments: |
| camera: Camera(s) to be tested. Use comma to separate multiple |
| camera Ids. Ex: 'camera=0,1' or 'camera=1' |
| device: Device id for adb |
| fps: FPS to capture with during the test |
| img_size: Comma-separated dimensions of captured images (defaults to |
| 640x480). Ex: 'img_size=<width>,<height>' |
| num_runs: Number of times to repeat the test |
| rotator: String for rotator id in for vid:pid:ch |
| test_length: How long the test should run for (in seconds) |
| tmp_dir: Location of temp directory for output files |
| """ |
| |
| camera_id = '0' |
| fps = str(FPS) |
| img_size = '%s,%s' % (W, H) |
| num_runs = 1 |
| rotator_ids = 'default' |
| test_length = str(TEST_LENGTH) |
| tmp_dir = None |
| for s in sys.argv[1:]: |
| if s[:7] == 'camera=' and len(s) > 7: |
| camera_id = s[7:] |
| if s[:4] == 'fps=' and len(s) > 4: |
| fps = s[4:] |
| elif s[:9] == 'img_size=' and len(s) > 9: |
| img_size = s[9:] |
| elif s[:9] == 'num_runs=' and len(s) > 9: |
| num_runs = int(s[9:]) |
| elif s[:8] == 'rotator=' and len(s) > 8: |
| rotator_ids = s[8:] |
| elif s[:12] == 'test_length=' and len(s) > 12: |
| test_length = s[12:] |
| elif s[:8] == 'tmp_dir=' and len(s) > 8: |
| tmp_dir = s[8:] |
| |
| # Make output directories to hold the generated files. |
| tmpdir = tempfile.mkdtemp(dir=tmp_dir) |
| print 'Saving output files to:', tmpdir, '\n' |
| |
| device_id = its.device.get_device_id() |
| device_id_arg = 'device=' + device_id |
| print 'Testing device ' + device_id |
| |
| # ensure camera_id is valid |
| avail_camera_ids = find_avail_camera_ids(device_id_arg, tmpdir) |
| if camera_id not in avail_camera_ids: |
| print 'Need to specify valid camera_id in ', avail_camera_ids |
| sys.exit() |
| |
| camera_id_arg = 'camera=' + camera_id |
| if rotator_ids: |
| rotator_id_arg = 'rotator=' + rotator_ids |
| print 'Preparing to run sensor_fusion on camera', camera_id |
| |
| img_size_arg = 'img_size=' + img_size |
| print 'Image dimensions are ' + 'x'.join(img_size.split(',')) |
| |
| fps_arg = 'fps=' + fps |
| test_length_arg = 'test_length=' + test_length |
| print 'Capturing at %sfps' % fps |
| |
| os.mkdir(os.path.join(tmpdir, camera_id)) |
| |
| # Run test "num_runs" times, capturing stdout and stderr. |
| num_pass = 0 |
| num_fail = 0 |
| num_skip = 0 |
| num_socket_fails = 0 |
| num_non_socket_fails = 0 |
| shift_list = [] |
| for i in range(num_runs): |
| os.mkdir(os.path.join(tmpdir, camera_id, SCENE_NAME+'_'+str(i))) |
| cmd = 'python tools/rotation_rig.py rotator=%s' % rotator_ids |
| subprocess.Popen(cmd.split()) |
| cmd = ['python', os.path.join(TEST_DIR, TEST_NAME+'.py'), |
| device_id_arg, camera_id_arg, rotator_id_arg, img_size_arg, |
| fps_arg, test_length_arg] |
| outdir = os.path.join(tmpdir, camera_id, SCENE_NAME+'_'+str(i)) |
| outpath = os.path.join(outdir, TEST_NAME+'_stdout.txt') |
| errpath = os.path.join(outdir, TEST_NAME+'_stderr.txt') |
| t0 = time.time() |
| with open(outpath, 'w') as fout, open(errpath, 'w') as ferr: |
| retcode = subprocess.call( |
| cmd, stderr=ferr, stdout=fout, cwd=outdir) |
| t1 = time.time() |
| |
| if retcode == 0: |
| retstr = 'PASS ' |
| time_shift = find_time_shift(outpath) |
| shift_list.append(time_shift) |
| num_pass += 1 |
| elif retcode == SKIP_RET_CODE: |
| retstr = 'SKIP ' |
| num_skip += 1 |
| else: |
| retstr = 'FAIL ' |
| time_shift = find_time_shift(outpath) |
| if time_shift is None: |
| if is_socket_fail(errpath): |
| num_socket_fails += 1 |
| else: |
| num_non_socket_fails += 1 |
| else: |
| shift_list.append(time_shift) |
| num_fail += 1 |
| msg = '%s %s/%s [%.1fs]' % (retstr, SCENE_NAME, TEST_NAME, t1-t0) |
| print msg |
| |
| if num_pass == 1: |
| print 'Best shift is %sms' % shift_list[0] |
| elif num_pass > 1: |
| shift_arr = numpy.array(shift_list) |
| mean, std = numpy.mean(shift_arr), numpy.std(shift_arr) |
| print 'Best shift mean is %sms with std. dev. of %sms' % (mean, std) |
| |
| pass_percentage = 100*float(num_pass+num_skip)/num_runs |
| print '%d / %d tests passed (%.1f%%)' % (num_pass+num_skip, |
| num_runs, |
| pass_percentage) |
| |
| if num_socket_fails != 0: |
| print '%s failure(s) due to socket issues' % num_socket_fails |
| if num_non_socket_fails != 0: |
| print '%s non-socket failure(s)' % num_non_socket_fails |
| |
| |
| def is_socket_fail(err_file_path): |
| """Search through a test run's stderr log for any mention of socket issues. |
| |
| Args: |
| err_file_path: File path for stderr logs to search through |
| |
| Returns: |
| True if the test run failed and it was due to socket issues. Otherwise, |
| False. |
| """ |
| return find_matching_line(err_file_path, SOCKET_FAIL_RE) is not None |
| |
| |
| def find_time_shift(out_file_path): |
| """Search through a test run's stdout log for the best time shift. |
| |
| Args: |
| out_file_path: File path for stdout logs to search through |
| |
| Returns: |
| The best time shift, if one is found. Otherwise, returns None. |
| """ |
| line = find_matching_line(out_file_path, SHIFT_RE) |
| if line is None: |
| return None |
| else: |
| words = line.split(' ') |
| # Get last word and strip off 'ms\n' before converting to a float. |
| return float(words[-1][:-3]) |
| |
| |
| def find_matching_line(file_path, regex): |
| """Search each line in the file at 'file_path' for a line matching 'regex'. |
| |
| Args: |
| file_path: File path for file being searched |
| regex: Regex used to match against lines |
| |
| Returns: |
| The first matching line. If none exists, returns None. |
| """ |
| with open(file_path) as f: |
| for line in f: |
| if regex.match(line): |
| return line |
| return None |
| |
| def find_avail_camera_ids(device_id_arg, tmpdir): |
| """Find the available camera IDs. |
| |
| Args: |
| devices_id_arg(str): device=### |
| tmpdir(str): generated tmp dir for run |
| Returns: |
| list of available cameras |
| """ |
| with its.device.ItsSession() as cam: |
| avail_camera_ids = cam.get_camera_ids() |
| return avail_camera_ids |
| |
| |
| if __name__ == '__main__': |
| main() |
| |