blob: 4004c4678f27007aeeb780e30c4e35f31357b50e [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Captures a video from an Android device."""
import argparse
import logging
import os
import threading
import time
import sys
if __name__ == '__main__':
sys.path.append(
os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..', '..')))
from devil.android import device_signal
from devil.android import device_utils
from devil.android.tools import script_common
from devil.utils import cmd_helper
from devil.utils import reraiser_thread
from devil.utils import timeout_retry
logger = logging.getLogger(__name__)
class VideoRecorder(object):
"""Records a screen capture video from an Android Device (KitKat or newer)."""
def __init__(self, device, megabits_per_second=4, size=None, rotate=False):
"""Creates a VideoRecorder instance.
Args:
device: DeviceUtils instance.
host_file: Path to the video file to store on the host.
megabits_per_second: Video bitrate in megabits per second. Allowed range
from 0.1 to 100 mbps.
size: Video frame size tuple (width, height) or None to use the device
default.
rotate: If True, the video will be rotated 90 degrees.
"""
self._bit_rate = megabits_per_second * 1000 * 1000
self._device = device
self._device_file = (
'%s/screen-recording.mp4' % device.GetAppWritablePath())
self._recorder_thread = None
self._rotate = rotate
self._size = size
self._started = threading.Event()
def __enter__(self):
self.Start()
def Start(self, timeout=None):
"""Start recording video."""
def screenrecord_started():
return bool(self._device.GetPids('screenrecord'))
if screenrecord_started():
raise Exception("Can't run multiple concurrent video captures.")
self._started.clear()
self._recorder_thread = reraiser_thread.ReraiserThread(self._Record)
self._recorder_thread.start()
timeout_retry.WaitFor(
screenrecord_started, wait_period=1, max_tries=timeout)
self._started.wait(timeout)
def _Record(self):
cmd = ['screenrecord', '--verbose', '--bit-rate', str(self._bit_rate)]
if self._rotate:
cmd += ['--rotate']
if self._size:
cmd += ['--size', '%dx%d' % self._size]
cmd += [self._device_file]
for line in self._device.adb.IterShell(
' '.join(cmd_helper.SingleQuote(i) for i in cmd), None):
if line.startswith('Content area is '):
self._started.set()
def __exit__(self, _exc_type, _exc_value, _traceback):
self.Stop()
def Stop(self):
"""Stop recording video."""
if not self._device.KillAll(
'screenrecord', signum=device_signal.SIGINT, quiet=True):
logger.warning('Nothing to kill: screenrecord was not running')
self._recorder_thread.join()
def Pull(self, host_file=None):
"""Pull resulting video file from the device.
Args:
host_file: Path to the video file to store on the host.
Returns:
Output video file name on the host.
"""
# TODO(jbudorick): Merge filename generation with the logic for doing so in
# DeviceUtils.
host_file_name = (host_file or 'screen-recording-%s-%s.mp4' % (str(
self._device), time.strftime('%Y%m%dT%H%M%S', time.localtime())))
host_file_name = os.path.abspath(host_file_name)
self._device.PullFile(self._device_file, host_file_name)
self._device.RemovePath(self._device_file, force=True)
return host_file_name
def main():
# Parse options.
parser = argparse.ArgumentParser(description=__doc__)
script_common.AddDeviceArguments(parser)
parser.add_argument(
'-f',
'--file',
metavar='FILE',
help='Save result to file instead of generating a '
'timestamped file name.')
parser.add_argument(
'-v', '--verbose', action='store_true', help='Verbose logging.')
parser.add_argument(
'-b',
'--bitrate',
default=4,
type=float,
help='Bitrate in megabits/s, from 0.1 to 100 mbps, '
'%(default)d mbps by default.')
parser.add_argument(
'-r', '--rotate', action='store_true', help='Rotate video by 90 degrees.')
parser.add_argument(
'-s',
'--size',
metavar='WIDTHxHEIGHT',
help='Frame size to use instead of the device '
'screen size.')
parser.add_argument(
'host_file',
nargs='?',
help='File to which the video capture will be written.')
args = parser.parse_args()
host_file = args.host_file or args.file
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
size = (tuple(int(i) for i in args.size.split('x')) if args.size else None)
def record_video(device, stop_recording):
recorder = VideoRecorder(
device, megabits_per_second=args.bitrate, size=size, rotate=args.rotate)
with recorder:
stop_recording.wait()
f = None
if host_file:
root, ext = os.path.splitext(host_file)
f = '%s_%s%s' % (root, str(device), ext)
f = recorder.Pull(f)
print 'Video written to %s' % os.path.abspath(f)
parallel_devices = device_utils.DeviceUtils.parallel(script_common.GetDevices(
args.devices, args.denylist_file),
asyn=True)
stop_recording = threading.Event()
running_recording = parallel_devices.pMap(record_video, stop_recording)
print 'Recording. Press Enter to stop.',
sys.stdout.flush()
raw_input()
stop_recording.set()
running_recording.pGet(None)
return 0
if __name__ == '__main__':
sys.exit(main())