blob: 88426bc431648ffdbb87be03cbc7321162bf3980 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (C) 2022 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys, argparse, os
import subprocess
import re
class CurrentUserState:
def __init__(self, args):
self.args = args
self.current_user = get_current_user(args)
def name(self):
return "RUN_ON_CURRENT_USER"
def is_active(self, device_state):
return True
def include_annotations(self):
return []
def initialise(self, device_state):
pass
def enter(self):
debug(self.args, "[Test] Entering state " + self.name())
def get_user(self):
return self.current_user
class SystemUserState:
def __init__(self, args):
self.args = args
def name(self):
return "RUN_ON_SYSTEM_USER"
def is_active(self, device_state):
return device_state["current_user"] == 0
def include_annotations(self):
return []
def initialise(self, device_state):
pass
def enter(self):
debug(self.args, "[Test] Entering state " + self.name())
execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", "0"])
def get_user(self):
return 0
class SecondaryUserState:
def __init__(self, args):
self.args = args
def name(self):
return "RUN_ON_SECONDARY_USER"
def is_active(self, device_state):
return self._is_secondary_user(device_state["users"][device_state["current_user"]])
def include_annotations(self):
return ["com.android.bedstead.harrier.annotations.RequireRunOnSecondaryUser"]
def initialise(self, device_state):
self.user_id = device_state["current_user"]
def enter(self):
debug(self.args, "[Test] Entering state " + self.name())
self.user_id = self._get_or_create_secondary_user()
execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(self.user_id)])
execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[self.args.module][PACKAGE_NAME]])
def _get_or_create_secondary_user(self):
users = get_users(self.args)
for user in users.values():
if self._is_secondary_user(user):
return user["id"]
return create_user(self.args)
def _is_secondary_user(self, user):
return user["type"] == "full.SECONDARY"
def get_user(self):
return self.user_id
class WorkProfileState:
def __init__(self, args):
self.args = args
def name(self):
return "RUN_ON_WORK_PROFILE"
def is_active(self, device_state):
return self._has_work_profile(device_state["users"][device_state["current_user"]])
def include_annotations(self):
return ["com.android.bedstead.harrier.annotations.RequireRunOnWorkProfile"]
def initialise(self, device_state):
self.user_id = device_state["users"][device_state["current_user"]]["work_profile_id"]
def enter(self):
debug(self.args, "[Test] Entering state " + self.name())
user = self._get_or_create_work_profile()
self.user_id = user["id"]
execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(user["parent"])])
execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[self.args.module][PACKAGE_NAME]])
def _get_or_create_work_profile(self):
users = get_users(self.args)
for user in users.values():
if self._is_work_profile(user):
return user
# TODO: Support headless
work_profile_id = create_work_profile(self.args, 0)
return {"id": work_profile_id, "type": "profile.MANAGED", "flags": None, "parent": "0"}
def get_user(self):
return self.user_id
def _has_work_profile(self, user):
return "work_profile_id" in user
def _is_work_profile(self, user):
return user["type"] == "profile.MANAGED"
RUN_ON_CURRENT_USER = CurrentUserState
RUN_ON_SYSTEM_USER = SystemUserState
RUN_ON_SECONDARY_USER = SecondaryUserState
RUN_ON_WORK_PROFILE = WorkProfileState
STATE_CODES = {
"c": RUN_ON_CURRENT_USER,
"s": RUN_ON_SYSTEM_USER,
"e": RUN_ON_SECONDARY_USER,
"w": RUN_ON_WORK_PROFILE
}
# We hardcode supported modules so we can optimise for
# development of those modules. It is not our intention to support all tests.
supported_modules = {
"CtsDevicePolicyTestCases": ("android.devicepolicy.cts", "androidx.test.runner.AndroidJUnitRunner", [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER]),
"CtsMultiUserTestCases": ("android.multiuser.cts", "androidx.test.runner.AndroidJUnitRunner", [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER])
}
PACKAGE_NAME = 0
RUNNER = 1
STATES = 2
# Theme configuration
RESET_CODE = "\33[0m"
CLASS_NAME_COLOUR_CODE = "\33[35m"
TEST_NAME_COLOUR_CODE = "\33[33m"
PASSED_CODE = "\33[32m"
FAILED_CODE = "\33[31m"
IGNORED_CODE = "\33[33m"
AVAILABLE_PARAMETER_COLOUR_CODES = [
'\33[40m',
'\33[41m',
'\33[42m',
'\33[43m',
'\33[44m',
'\33[45m',
'\33[46m',
'\33[47m'
]
def find_module_for_class_method(class_method):
""" If only a class#method is provided, see if we can find the module. Will return None if not. """
matching_modules = []
for module in supported_modules:
if class_method.startswith(supported_modules[module][0]):
matching_modules.append(module)
if len(matching_modules) == 0:
return None
elif len(matching_modules) == 1:
return matching_modules[0]
else:
print("Found multiple potential modules. Please add module name to command")
sys.exit(1)
def get_args():
""" Parse command line arguments. """
parser = argparse.ArgumentParser(description="Run tests during development")
parser.add_argument("target", type=str, help="The target to run. This is in the form module:class#method. Method and class are optional")
parser.add_argument("-b", "--build", action='store_true', help="Builds test targets. (default)")
parser.add_argument("-i", "--install", action='store_true', help="Builds test targets. (default)")
parser.add_argument("-t", "--test", action='store_true', help="Builds test targets. (default)")
parser.add_argument("-d", "--debug", action="store_true", help="Include debug output.")
parser.add_argument("-s", "--states", help="Specify states which should be included. Options are (current (s)ystem s(e)condary, (w)ork profile. Defaults to all states.")
args = parser.parse_args()
if not args.build and not args.install and not args.test:
args.build = True
args.install = True
args.test = True
if not args.states:
args.states = "sew"
args.states = set(args.states)
valid_states = ["c", "s", "e", "w"]
for state in args.states:
if not state in valid_states:
print("State " + state + " is invalid, must be one of " + str(valid_states))
sys.exit(1)
args.states = [STATE_CODES[a] for a in args.states]
load_module_and_class_method(args)
return args
def load_module_and_class_method(args):
""" Parse target from args and load module and class_method. """
target_parts = args.target.split(":", 1)
args.module = target_parts[0]
args.class_method = target_parts[1] if len(target_parts) > 1 else None
if not args.module in supported_modules:
# Let's guess that maybe they ommitted the module
args.class_method = args.module
args.module = find_module_for_class_method(args.class_method)
if not args.module:
print("Could not find module or module not supported. btest only supports a small number of test modules.")
sys.exit(1)
def build_module(args):
print("Building module " + args.module)
build_top = os.environ["ANDROID_BUILD_TOP"]
# Unfortunately I haven't figured out a way to just import the envsetup so we need to run it each time
command = ". " + build_top + "/build/envsetup.sh && m " + args.module
debug(args, "[Build] Executing '" + command + "'")
subprocess.run(command, text=True, shell=True, executable="/bin/bash")
def install(args):
out = os.environ["OUT"]
command = ["adb install --user all -t -g " + out + "/testcases/" + args.module + "/*/" + args.module + ".apk"]
execute_shell_command("Install", args, command, shell=True, executable="/bin/bash")
class Test:
def __init__(self, args, state, btest_run, total_test_count, next_test, include_annotations, exclude_annotations, has_later_states):
self.args = args
self.state = state
self.module_package = supported_modules[args.module][PACKAGE_NAME]
self.runner = supported_modules[args.module][RUNNER]
self.parameter_colour_codes = {}
self.available_parameter_colour_codes_pointer = 0
self.total_test_count = total_test_count
self.next_test = next_test
self.btest_run = btest_run
self.test_results = {}
self.include_annotations = include_annotations
self.exclude_annotations = exclude_annotations
self.has_no_tests = False # Marked at the end of the test if there were no new tests
self.has_loaded_total_test_count = False # Used to ensure we don't double count test counts
self.has_later_states = has_later_states # True if we don't know the full number of tests because we'll be running more states later
def run(self):
command = "adb shell am instrument --user " + str(self.state.get_user())
# Use the formatted output
command += " -e listener com.android.bedstead.harrier.BedsteadRunListener"
if self.include_annotations:
command += " -e annotation " + ",".join(self.include_annotations)
if self.exclude_annotations:
command += " -e notAnnotation " + ",".join(self.exclude_annotations)
if self.args.class_method:
if self.args.class_method.endswith("*"):
# We need to escape 3 times to get through the various interpreters
# Using regex adds 5 seconds or so to running a single test so has to be opt-in
command += " -e tests_regex " + re.escape(re.escape(re.escape(self.args.class_method[:-1]))) + ".*"
else:
command += " -e class " + self.args.class_method
command += " -w " + self.module_package + "/" + self.runner
if self.args.debug:
print("[Test] Executing '" + command + "'")
if self.args.debug:
self.test_process = subprocess.Popen([command], shell=True, executable="/bin/bash")
else:
self.test_process = subprocess.Popen([command], shell=True, executable="/bin/bash", stdout=subprocess.PIPE)
if self.args.debug:
print("[Test] About to sleep")
import time
# TODO: For some reason we need a sleep otherwise the test_process doesn't launch... look into this
time.sleep(2)
if self.args.debug:
print("[Test] Slept")
num_tests = self.get_num_tests()
self.total_test_count += num_tests
modified_total_test_count = str(self.total_test_count)
if self.has_later_states:
modified_total_test_count += "+"
total_test_length = len(modified_total_test_count)
for i in range(num_tests):
result = self.get_result(i)
test_name_parts = re.split('[#\[\]]', result["testName"])
print("[" + str(self.next_test).rjust(total_test_length) + "/" + modified_total_test_count + "] " + CLASS_NAME_COLOUR_CODE + test_name_parts[0] + RESET_CODE + "#" + TEST_NAME_COLOUR_CODE + test_name_parts[1] + RESET_CODE, end='')
self.next_test += 1
if len(test_name_parts) > 2:
print("[" + self.get_parameter_colour_code(test_name_parts[2]) + test_name_parts[2] + RESET_CODE + "]", end='')
sys.stdout.flush()
while not result["isFinished"]:
result = self.get_result(i)
self.print_result(result)
while self.tests_are_running():
pass
if not self.args.debug:
out, err = self.test_process.communicate()
self.out = out.decode('utf-8')
if "Process crashed before executing the test" in self.out:
print(self.out)
sys.exit(1)
def tests_are_running(self):
return self.test_process.poll() is None
def get_num_tests(self):
numTests = -1
while numTests == -1:
if not self.tests_are_running():
print("Error - tests not running")
sys.exit(0)
output, err = execute_shell_command("TEST", self.args, ["adb", "shell", "content query --user " + str(self.state.get_user()) + " --uri content://" + self.module_package + ".BedsteadRunResultsProvider/numTests"])
if not output:
continue # Not running yet?
if "No result found" in output:
continue
numTests = int(output.split("tests=", 2)[1].strip())
return numTests
def get_result(self, i):
result = None
while not result:
output, err = execute_shell_command("TEST", self.args, ["adb", "shell", "content query --user " + str(self.state.get_user()) + " --uri content://" + self.module_package + ".BedsteadRunResultsProvider/" + str(i)])
if not output:
continue # Not running yet?
if "No result found" in output:
continue
result = {}
result["index"] = int(output.split("index=", 2)[1].split(",", 2)[0])
result["testName"] = output.split("testName=", 2)[1].split(", result=", 2)[0]
result["isFinished"] = output.split("isFinished=", 2)[1].strip() == "true"
if result["isFinished"]:
result["result"] = int(output.split("result=", 2)[1].split(",", 2)[0])
result["message"] = output.split("message=", 2)[1].split(", stackTrace=", 2)[0]
result["stackTrace"] = output.split("stackTrace=", 2)[1].split(", runTime=", 2)[0]
result["runTime"] = int(output.split("runTime=", 2)[1].split(",", 2)[0])
return result
def get_parameter_colour_code(self, parameter):
if not parameter in self.parameter_colour_codes:
self.parameter_colour_codes[parameter] = AVAILABLE_PARAMETER_COLOUR_CODES[self.available_parameter_colour_codes_pointer]
self.available_parameter_colour_codes_pointer = (self.available_parameter_colour_codes_pointer + 1) % len(AVAILABLE_PARAMETER_COLOUR_CODES)
return self.parameter_colour_codes[parameter]
def print_result(self, test_result):
if test_result["result"] == 0:
self.btest_run.passed_tests.append(test_result)
print(" ✅ " + PASSED_CODE + "PASSED" + RESET_CODE + " (" + format_nanos(test_result["runTime"]) + ")", flush=True)
elif test_result["result"] == 1:
self.btest_run.failed_tests.append(test_result)
print(" ❌ " + FAILED_CODE + "FAILED (" + test_result["message"] + ")" + RESET_CODE + " (" + format_nanos(test_result["runTime"]) + ")\n\n" + test_result["stackTrace"] + "\n", flush=True)
elif test_result["result"] == 2:
self.btest_run.ignored_tests.append(test_result)
print(" " + IGNORED_CODE + "// IGNORED" + RESET_CODE + " (" + format_nanos(test_result["runTime"]) + ")", flush=True)
elif test_result["result"] == 3:
self.btest_run.assumption_failed_tests.append(test_result)
print(" " + IGNORED_CODE + "// ASSUMPTION FAILED (" + test_result["message"] + ")" + RESET_CODE + " (" + format_nanos(test_result["runTime"]) + ")", flush=True)
else:
print("ERROR PARSING TEST RESULT " + test_result, flush=True)
sys.exit(1)
def format_nanos(nanos):
ms = int(nanos) / 1000000
if ms < 800:
return "{:.2f}ms".format(ms)
seconds = ms / 1000
if seconds < 60:
return "{:.2f}s".format(seconds)
minutes = seconds / 60
return "{:.2f}m".format(minutes)
class BtestRun:
def __init__(self):
self.passed_tests = []
self.failed_tests = []
self.ignored_tests = []
self.assumption_failed_tests = []
def execute_shell_command(stage, args, command, **extra_args):
debug(args, "[" + stage + "] Executing '" + " ".join(command) + "'")
r = subprocess.run(command, capture_output=True, text=True, **extra_args)
output = r.stdout
debug(args, "[" + stage + "] Output: '" + output + "' Err: '" + r.stderr + "'")
return output, r.stderr
def create_user(args):
output, err = execute_shell_command("Test", args, ["adb", "shell", "pm", "create-user", "user"])
id = int(output.rsplit(" ", 1)[1].strip())
execute_shell_command("Test", args, ["adb", "shell", "am start-user " + str(id)])
return id
def create_work_profile(args, parent_id):
output, err = execute_shell_command("Test", args, ["adb", "shell", "pm", "create-user", "--managed", "--profileOf", str(parent_id), "user"])
id = int(output.rsplit(" ", 1)[1].strip())
execute_shell_command("Test", args, ["adb", "shell", "am start-user " + str(id)])
return id
def gather_device_state(args):
current_user = get_current_user(args)
users = get_users(args)
return {"current_user": current_user, "users": users}
def get_users(args):
users_output, err = execute_shell_command("Test", args, ["adb", "shell", "cmd user list -v"])
users = {}
for user_row in users_output.split("\n")[1:]:
if not user_row:
continue
id = int(user_row.split("id=", 2)[1].split(",", 2)[0])
type = user_row.split("type=", 2)[1].split(",", 2)[0]
flags = user_row.split("flags=", 2)[1].split(" ", 2)[0].split("|")
parent = None
if "MANAGED_PROFILE" in flags:
parent = int(user_row.split("parentId=", 2)[1].split(")", 2)[0])
user = {"id": id, "flags": flags, "type": type, "parent": parent}
users[user["id"]] = user
for user in users.values():
if user["type"] == "profile.MANAGED":
users[user["parent"]]["work_profile_id"] = user["id"]
return users
def get_current_user(args):
return int(execute_shell_command("Test", args, ["adb", "shell", "am", "get-current-user"])[0].strip())
def main():
args = get_args()
if args.build:
build_module(args)
if args.install:
install(args)
if args.test:
test = None
btest_run = BtestRun()
total_test_count = 0
next_test = 1
device_state = gather_device_state(args)
# Construct modules with args
states = [m(args) for m in supported_modules[args.module][STATES]]
if RUN_ON_CURRENT_USER in args.states:
for state in states:
if (state.is_active(device_state)):
# Found current
args.states.append(state.__class__)
break
# We calculate annotations before filtering so we properly exclude all
all_include_annotations = []
for state in states:
all_include_annotations.extend(state.include_annotations())
states = [m for m in states if m.__class__ in args.states]
first_state = None
for state in states:
if (state.is_active(device_state)):
first_state = state
state.initialise(device_state) # Entering a state we are already in
break
if first_state is None:
# We are not in any state, enter the first one arbitrarily
first_state = states[0]
first_state.enter()
# Move to start
states.insert(0, states.pop(states.index(first_state)))
needs_to_enter_state = False
try:
for i, state in enumerate(states):
debug(args, "[Test] Running tests for " + state.name())
if needs_to_enter_state:
state.enter()
include_annotations = state.include_annotations()
exclude_annotations = [x for x in all_include_annotations if not x in include_annotations]
test = Test(args, state, btest_run, total_test_count, next_test, include_annotations, exclude_annotations, (i < len(states) - 1))
test.run()
total_test_count = test.total_test_count
next_test = test.next_test
needs_to_enter_state = True
except KeyboardInterrupt:
# Kill the test process then move on to print the results
if test is not None:
test.test_process.kill()
except Exception as e:
if test is not None:
test.test_process.kill()
raise e
print("\n" + PASSED_CODE + "Passed: " + str(len(btest_run.passed_tests)) + RESET_CODE
+ "," + FAILED_CODE + " Failed: " + str(len(btest_run.failed_tests)) + RESET_CODE
+ "," + IGNORED_CODE + " Ignored: " + str(len(btest_run.ignored_tests)) + RESET_CODE
+ ", " + IGNORED_CODE + "Assumption Failed: " + str(len(btest_run.assumption_failed_tests)) + RESET_CODE)
if len(btest_run.failed_tests) > 0:
print("\n\nFailures:")
for test_result in btest_run.failed_tests:
print(test_result["testName"] + " ❌ " + FAILED_CODE + "FAILED (" + test_result["message"] + ")" + RESET_CODE + " (" + format_nanos(test_result["runTime"]) + ")", flush=True)
def debug(args, msg):
if args.debug:
print(msg)
if __name__ == '__main__':
main()