blob: abc094c4c6ca59a974182bdd23dc43dfe3502e8e [file] [log] [blame]
# -*- coding: utf-8 -*-
"""ADB stress test utilities."""
from multiprocessing import pool
import argparse
import os
import subprocess
import sys
import time
import platform
def print_progress(perc, prefix='',
suffix='', decimals=1, bar_len=100):
"""Call in a loop to create terminal progress bar.
Args:
perc - Required : current percentages (Float)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : pos number of decimals in % complete (Int)
barLength - Optional : character length of bar (Int)
"""
format_str = '{0:.' + str(decimals) + 'f}'
perc_str = format_str.format(perc * 100)
filled_len = int(round(bar_len * perc))
bar = '*' * filled_len + '-' * (bar_len - filled_len)
sys.stdout.write('\r%s |%s| %s%s %s' % (prefix, bar, perc_str, '%', suffix)),
if perc == 1:
sys.stdout.write('\n')
sys.stdout.flush()
def get_connected_devices():
"""Returns list of adb device ids that are connected."""
proc = subprocess.Popen('adb devices'.split(), stdout=subprocess.PIPE)
output, error = proc.communicate()
connected = []
# Collect connected devices.
# Note that since Windows includes a carriage return, we
# do it in a seperate loop.
if platform.system() is not 'Windows':
for emulator_entry in output.split('\n')[1:]:
if emulator_entry != '':
connected.append(emulator_entry.split('\t')[0])
else:
for emulator_entry in output.split('\r\n')[1:]:
if emulator_entry != '':
connected.append(emulator_entry.split('\t')[0])
return connected
def test_connected(devices):
"""Verify that the expected number of devices/emulators are still connected.
Args:
devices: Number of expected devices.
Returns:
A tuple of form (success, connected).
The success member indicates whether the expected number
of devices were found.
The connected member contains a list of device serial numbers
identifying the connected devices.
"""
# verify expected emulators/devices are present
# Note that since Windows includes a carriage return, we do it in a seperate loop.
connected = get_connected_devices()
success = True
if len(connected) != devices:
print('\n\nERROR:\nExpected number of connections: ' +
str(devices))
print('Found: ' + str(len(connected)))
success = False
return success, connected
def noop():
"""Function that does absolutely nothing.
This is useful as a placeholder / default function
for function arguments, such as the setup and teardown arguments
of the launcher function.
"""
pass
def launcher(test_fn, duration, devices, setup=noop, cleanup=noop, is_print_progress=False, log_dir='logs'):
"""Higher-order function for launching tests
Args:
test_fn: Function that executes a single iteration of a test. This function must take a single argument,
which is the device under test, and must return a boolean value indicating the success (True)
or failure (False) of the test. Failure may also be indicated by raising an exception.
duration: Maximum elapsed running time
devices: Number of expected devices.
setup: Function that performs any necessary setup steps before the test is run
(optional — defaults to noop).
cleanup: Function that performs any necessary cleanup steps after the test is run
(optional — defaults to noop).
is_print_progress: If True, progress information is printed to stdout after each iteration of the test.
If False (the default), progress information is not printed.
If any other value (i.e., non-boolean) is provided for this argument,
the behaviour of this function is undefined.
log_dir: base directory under which logs will be placed.
Returns:
True if the test ran successfully to completion, otherwise False.
"""
# ThreadPool for running the tests in parallel.
# We choose the size to match the number of devices, so that every device can execute in parallel.
thread_pool = pool.ThreadPool(processes = devices)
connected_devices = get_connected_devices()
try:
setup()
duration_sec = int(duration * 3600)
start = time.time()
stop = start + duration_sec
print_progress(0, prefix='Progress:', suffix='Complete', bar_len=50)
next_progress_time = start + 60
iteration = 0
while time.time() < stop:
if is_print_progress and time.time() > next_progress_time:
# Print the progress per minute
print_progress(float(time.time()-start)/duration_sec, prefix='Progress:', suffix='Complete', bar_len=50)
next_progress_time += 60
connection_success, connected = test_connected(devices)
if not connection_success:
failure_time = time.time() - start
for device in connected_devices:
if device not in connected:
filename = os.path.join(log_dir, device, str(iteration) + '.txt')
msg = ("Device failed connection test for interation "
+ str(iteration)
+ "(at " + str(failure_time) + " seconds)")
spit(filename, msg)
# if no devices are connected, then end test with failure.
if not connected:
return False
# Run one iteration of the test against every device in parallel
iteration += 1
results = thread_pool.map(test_fn, connected)
# Verify the results
for result in results:
if not result:
return False
# Capture logcat.
logs = thread_pool.map(logcat, connected)
for device,log in zip(connected, logs):
if log:
filename = os.path.join(log_dir, device, str(iteration) + '.txt')
spit(filename, log)
# If we get here, the test completed successfully.
if is_print_progress:
# Print the progress bar one last time, to show 100%.
print_progress(1, prefix='Progress:', suffix='Complete', bar_len=50)
print('\nSUCCESS\n')
return True
finally:
print('\nIterations: %s\n' % iteration)
cleanup()
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'-d', '--duration', metavar='float', type=float, default=1,
help='Duration of time to run stress test (in hrs)')
parser.add_argument(
'-c', '--count', metavar='int', type=int, default=1,
help='Number of devices/emulators connected')
parser.add_argument(
'-p', '--progress', default=False,
action='store_const', const=True,
help='Print progress')
parser.add_argument(
'--log-dir', type=str, default='logs',
help='Directory under which log files will be placed (defaults to "logs")')
return parser.parse_args()
def adb(dut, cmd):
"""Helper function for running adb commands.
Args:
dut: Device under tests.
cmd: List containing adb command to run arguments.
Returns:
String containing the comand's output.
"""
adb_cmd = ['adb', '-s', dut] + cmd
return subprocess.check_output(adb_cmd)
def logcat(dut):
"""Get logcat of specified device.
Args:
dut: Device under test.
cmd: List containing adb command to run arguments.
Returns:
String containing the command's output.
"""
try:
cmd = ['shell', 'logcat', '-d', '-v', 'threadtime']
return adb(dut, cmd)
except:
return None
def spit(filename, text):
"""Writes given text to specified file.
Args:
filename: Name of file to write to.
text: The text to write.
"""
# Ensure the enclosing directory exists.
directory = os.path.dirname(filename)
if not os.path.exists(directory):
os.makedirs(directory)
# Write the file.
out_file = open(filename, 'w+')
out_file.write(text)
out_file.close()