| #!/usr/bin/env python |
| # |
| # Copyright (C) 2022 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the 'License'); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an 'AS IS' BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import sys, argparse, os |
| import subprocess |
| import re |
| |
| class CurrentUserState: |
| def __init__(self, args): |
| self.args = args |
| self.current_user = get_current_user(args) |
| |
| def name(self): |
| return "RUN_ON_CURRENT_USER" |
| |
| def is_active(self, device_state): |
| return True |
| |
| def include_annotations(self): |
| return [] |
| |
| def initialise(self, device_state): |
| pass |
| |
| def enter(self): |
| debug(self.args, "[Test] Entering state " + self.name()) |
| |
| def get_user(self): |
| return self.current_user |
| |
| class SystemUserState: |
| def __init__(self, args): |
| self.args = args |
| |
| def name(self): |
| return "RUN_ON_SYSTEM_USER" |
| |
| def is_active(self, device_state): |
| return device_state["current_user"] == 0 |
| |
| def include_annotations(self): |
| return [] |
| |
| def initialise(self, device_state): |
| pass |
| |
| def enter(self): |
| debug(self.args, "[Test] Entering state " + self.name()) |
| execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", "0"]) |
| |
| def get_user(self): |
| return 0 |
| |
| class SecondaryUserState: |
| def __init__(self, args): |
| self.args = args |
| |
| def name(self): |
| return "RUN_ON_SECONDARY_USER" |
| |
| def is_active(self, device_state): |
| return self._is_secondary_user(device_state["users"][device_state["current_user"]]) |
| |
| def include_annotations(self): |
| return ["com.android.bedstead.harrier.annotations.RequireRunOnSecondaryUser"] |
| |
| def initialise(self, device_state): |
| self.user_id = device_state["current_user"] |
| |
| def enter(self): |
| debug(self.args, "[Test] Entering state " + self.name()) |
| |
| self.user_id = self._get_or_create_secondary_user() |
| execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(self.user_id)]) |
| execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[self.args.module][PACKAGE_NAME]]) |
| |
| def _get_or_create_secondary_user(self): |
| users = get_users(self.args) |
| for user in users.values(): |
| if self._is_secondary_user(user): |
| return user["id"] |
| |
| return create_user(self.args) |
| |
| def _is_secondary_user(self, user): |
| return user["type"] == "full.SECONDARY" |
| |
| def get_user(self): |
| return self.user_id |
| |
| class WorkProfileState: |
| def __init__(self, args): |
| self.args = args |
| |
| def name(self): |
| return "RUN_ON_WORK_PROFILE" |
| |
| def is_active(self, device_state): |
| return self._has_work_profile(device_state["users"][device_state["current_user"]]) |
| |
| def include_annotations(self): |
| return ["com.android.bedstead.harrier.annotations.RequireRunOnWorkProfile"] |
| |
| def initialise(self, device_state): |
| self.user_id = device_state["users"][device_state["current_user"]]["work_profile_id"] |
| |
| def enter(self): |
| debug(self.args, "[Test] Entering state " + self.name()) |
| user = self._get_or_create_work_profile() |
| self.user_id = user["id"] |
| execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(user["parent"])]) |
| execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[self.args.module][PACKAGE_NAME]]) |
| |
| def _get_or_create_work_profile(self): |
| users = get_users(self.args) |
| for user in users.values(): |
| if self._is_work_profile(user): |
| return user |
| |
| # TODO: Support headless |
| work_profile_id = create_work_profile(self.args, 0) |
| return {"id": work_profile_id, "type": "profile.MANAGED", "flags": None, "parent": "0"} |
| |
| def get_user(self): |
| return self.user_id |
| |
| def _has_work_profile(self, user): |
| return "work_profile_id" in user |
| |
| def _is_work_profile(self, user): |
| return user["type"] == "profile.MANAGED" |
| |
| RUN_ON_CURRENT_USER = CurrentUserState |
| RUN_ON_SYSTEM_USER = SystemUserState |
| RUN_ON_SECONDARY_USER = SecondaryUserState |
| RUN_ON_WORK_PROFILE = WorkProfileState |
| |
| STATE_CODES = { |
| "c": RUN_ON_CURRENT_USER, |
| "s": RUN_ON_SYSTEM_USER, |
| "e": RUN_ON_SECONDARY_USER, |
| "w": RUN_ON_WORK_PROFILE |
| } |
| |
| # We hardcode supported modules so we can optimise for |
| # development of those modules. It is not our intention to support all tests. |
| supported_modules = { |
| "CtsDevicePolicyTestCases": ("android.devicepolicy.cts", "androidx.test.runner.AndroidJUnitRunner", [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER]), |
| "CtsMultiUserTestCases": ("android.multiuser.cts", "androidx.test.runner.AndroidJUnitRunner", [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER]) |
| } |
| |
| PACKAGE_NAME = 0 |
| RUNNER = 1 |
| STATES = 2 |
| |
| # Theme configuration |
| RESET_CODE = "\33[0m" |
| CLASS_NAME_COLOUR_CODE = "\33[35m" |
| TEST_NAME_COLOUR_CODE = "\33[33m" |
| PASSED_CODE = "\33[32m" |
| FAILED_CODE = "\33[31m" |
| IGNORED_CODE = "\33[33m" |
| |
| AVAILABLE_PARAMETER_COLOUR_CODES = [ |
| '\33[40m', |
| '\33[41m', |
| '\33[42m', |
| '\33[43m', |
| '\33[44m', |
| '\33[45m', |
| '\33[46m', |
| '\33[47m' |
| ] |
| |
| def find_module_for_class_method(class_method): |
| """ If only a class#method is provided, see if we can find the module. Will return None if not. """ |
| matching_modules = [] |
| for module in supported_modules: |
| if class_method.startswith(supported_modules[module][0]): |
| matching_modules.append(module) |
| |
| if len(matching_modules) == 0: |
| return None |
| elif len(matching_modules) == 1: |
| return matching_modules[0] |
| else: |
| print("Found multiple potential modules. Please add module name to command") |
| sys.exit(1) |
| |
| def get_args(): |
| """ Parse command line arguments. """ |
| parser = argparse.ArgumentParser(description="Run tests during development") |
| parser.add_argument("target", type=str, help="The target to run. This is in the form module:class#method. Method and class are optional") |
| parser.add_argument("-b", "--build", action='store_true', help="Builds test targets. (default)") |
| parser.add_argument("-i", "--install", action='store_true', help="Builds test targets. (default)") |
| parser.add_argument("-t", "--test", action='store_true', help="Builds test targets. (default)") |
| parser.add_argument("-d", "--debug", action="store_true", help="Include debug output.") |
| parser.add_argument("-s", "--states", help="Specify states which should be included. Options are (current (s)ystem s(e)condary, (w)ork profile. Defaults to all states.") |
| args = parser.parse_args() |
| if not args.build and not args.install and not args.test: |
| args.build = True |
| args.install = True |
| args.test = True |
| |
| if not args.states: |
| args.states = "sew" |
| args.states = set(args.states) |
| valid_states = ["c", "s", "e", "w"] |
| for state in args.states: |
| if not state in valid_states: |
| print("State " + state + " is invalid, must be one of " + str(valid_states)) |
| sys.exit(1) |
| args.states = [STATE_CODES[a] for a in args.states] |
| |
| load_module_and_class_method(args) |
| |
| return args |
| |
| def load_module_and_class_method(args): |
| """ Parse target from args and load module and class_method. """ |
| target_parts = args.target.split(":", 1) |
| args.module = target_parts[0] |
| args.class_method = target_parts[1] if len(target_parts) > 1 else None |
| |
| if not args.module in supported_modules: |
| # Let's guess that maybe they ommitted the module |
| args.class_method = args.module |
| args.module = find_module_for_class_method(args.class_method) |
| if not args.module: |
| print("Could not find module or module not supported. btest only supports a small number of test modules.") |
| sys.exit(1) |
| |
| def build_module(args): |
| print("Building module " + args.module) |
| build_top = os.environ["ANDROID_BUILD_TOP"] |
| # Unfortunately I haven't figured out a way to just import the envsetup so we need to run it each time |
| command = ". " + build_top + "/build/envsetup.sh && m " + args.module |
| debug(args, "[Build] Executing '" + command + "'") |
| subprocess.run(command, text=True, shell=True, executable="/bin/bash") |
| |
| def install(args): |
| out = os.environ["OUT"] |
| command = ["adb install --user all -t -g " + out + "/testcases/" + args.module + "/*/" + args.module + ".apk"] |
| execute_shell_command("Install", args, command, shell=True, executable="/bin/bash") |
| |
| class Test: |
| |
| def __init__(self, args, state, btest_run, total_test_count, next_test, include_annotations, exclude_annotations, has_later_states): |
| self.args = args |
| self.state = state |
| self.module_package = supported_modules[args.module][PACKAGE_NAME] |
| self.runner = supported_modules[args.module][RUNNER] |
| self.parameter_colour_codes = {} |
| self.available_parameter_colour_codes_pointer = 0 |
| self.total_test_count = total_test_count |
| self.next_test = next_test |
| self.btest_run = btest_run |
| self.test_results = {} |
| self.include_annotations = include_annotations |
| self.exclude_annotations = exclude_annotations |
| self.has_no_tests = False # Marked at the end of the test if there were no new tests |
| self.has_loaded_total_test_count = False # Used to ensure we don't double count test counts |
| self.has_later_states = has_later_states # True if we don't know the full number of tests because we'll be running more states later |
| |
| def run(self): |
| command = "adb shell am instrument --user " + str(self.state.get_user()) |
| |
| # Use the formatted output |
| command += " -e listener com.android.bedstead.harrier.BedsteadRunListener" |
| |
| if self.include_annotations: |
| command += " -e annotation " + ",".join(self.include_annotations) |
| |
| if self.exclude_annotations: |
| command += " -e notAnnotation " + ",".join(self.exclude_annotations) |
| |
| if self.args.class_method: |
| if self.args.class_method.endswith("*"): |
| # We need to escape 3 times to get through the various interpreters |
| # Using regex adds 5 seconds or so to running a single test so has to be opt-in |
| command += " -e tests_regex " + re.escape(re.escape(re.escape(self.args.class_method[:-1]))) + ".*" |
| else: |
| command += " -e class " + self.args.class_method |
| |
| command += " -w " + self.module_package + "/" + self.runner |
| |
| if self.args.debug: |
| print("[Test] Executing '" + command + "'") |
| |
| if self.args.debug: |
| self.test_process = subprocess.Popen([command], shell=True, executable="/bin/bash") |
| else: |
| self.test_process = subprocess.Popen([command], shell=True, executable="/bin/bash", stdout=subprocess.PIPE) |
| |
| if self.args.debug: |
| print("[Test] About to sleep") |
| import time |
| # TODO: For some reason we need a sleep otherwise the test_process doesn't launch... look into this |
| time.sleep(2) |
| if self.args.debug: |
| print("[Test] Slept") |
| |
| num_tests = self.get_num_tests() |
| self.total_test_count += num_tests |
| |
| modified_total_test_count = str(self.total_test_count) |
| if self.has_later_states: |
| modified_total_test_count += "+" |
| total_test_length = len(modified_total_test_count) |
| |
| for i in range(num_tests): |
| result = self.get_result(i) |
| |
| test_name_parts = re.split('[#\[\]]', result["testName"]) |
| print("[" + str(self.next_test).rjust(total_test_length) + "/" + modified_total_test_count + "] " + CLASS_NAME_COLOUR_CODE + test_name_parts[0] + RESET_CODE + "#" + TEST_NAME_COLOUR_CODE + test_name_parts[1] + RESET_CODE, end='') |
| self.next_test += 1 |
| |
| if len(test_name_parts) > 2: |
| print("[" + self.get_parameter_colour_code(test_name_parts[2]) + test_name_parts[2] + RESET_CODE + "]", end='') |
| sys.stdout.flush() |
| |
| while not result["isFinished"]: |
| result = self.get_result(i) |
| |
| self.print_result(result) |
| |
| while self.tests_are_running(): |
| pass |
| |
| if not self.args.debug: |
| out, err = self.test_process.communicate() |
| self.out = out.decode('utf-8') |
| |
| if "Process crashed before executing the test" in self.out: |
| print(self.out) |
| sys.exit(1) |
| |
| def tests_are_running(self): |
| return self.test_process.poll() is None |
| |
| def get_num_tests(self): |
| numTests = -1 |
| while numTests == -1: |
| if not self.tests_are_running(): |
| print("Error - tests not running") |
| sys.exit(0) |
| |
| output, err = execute_shell_command("TEST", self.args, ["adb", "shell", "content query --user " + str(self.state.get_user()) + " --uri content://" + self.module_package + ".BedsteadRunResultsProvider/numTests"]) |
| if not output: |
| continue # Not running yet? |
| if "No result found" in output: |
| continue |
| numTests = int(output.split("tests=", 2)[1].strip()) |
| return numTests |
| |
| def get_result(self, i): |
| result = None |
| while not result: |
| output, err = execute_shell_command("TEST", self.args, ["adb", "shell", "content query --user " + str(self.state.get_user()) + " --uri content://" + self.module_package + ".BedsteadRunResultsProvider/" + str(i)]) |
| if not output: |
| continue # Not running yet? |
| if "No result found" in output: |
| continue |
| |
| result = {} |
| result["index"] = int(output.split("index=", 2)[1].split(",", 2)[0]) |
| result["testName"] = output.split("testName=", 2)[1].split(", result=", 2)[0] |
| result["isFinished"] = output.split("isFinished=", 2)[1].strip() == "true" |
| if result["isFinished"]: |
| result["result"] = int(output.split("result=", 2)[1].split(",", 2)[0]) |
| result["message"] = output.split("message=", 2)[1].split(", stackTrace=", 2)[0] |
| result["stackTrace"] = output.split("stackTrace=", 2)[1].split(", runTime=", 2)[0] |
| result["runTime"] = int(output.split("runTime=", 2)[1].split(",", 2)[0]) |
| |
| return result |
| |
| def get_parameter_colour_code(self, parameter): |
| if not parameter in self.parameter_colour_codes: |
| self.parameter_colour_codes[parameter] = AVAILABLE_PARAMETER_COLOUR_CODES[self.available_parameter_colour_codes_pointer] |
| self.available_parameter_colour_codes_pointer = (self.available_parameter_colour_codes_pointer + 1) % len(AVAILABLE_PARAMETER_COLOUR_CODES) |
| return self.parameter_colour_codes[parameter] |
| |
| def print_result(self, test_result): |
| if test_result["result"] == 0: |
| self.btest_run.passed_tests.append(test_result) |
| print(" ✅ " + PASSED_CODE + "PASSED" + RESET_CODE + " (" + format_nanos(test_result["runTime"]) + ")", flush=True) |
| elif test_result["result"] == 1: |
| self.btest_run.failed_tests.append(test_result) |
| print(" ❌ " + FAILED_CODE + "FAILED (" + test_result["message"] + ")" + RESET_CODE + " (" + format_nanos(test_result["runTime"]) + ")\n\n" + test_result["stackTrace"] + "\n", flush=True) |
| elif test_result["result"] == 2: |
| self.btest_run.ignored_tests.append(test_result) |
| print(" " + IGNORED_CODE + "// IGNORED" + RESET_CODE + " (" + format_nanos(test_result["runTime"]) + ")", flush=True) |
| elif test_result["result"] == 3: |
| self.btest_run.assumption_failed_tests.append(test_result) |
| print(" " + IGNORED_CODE + "// ASSUMPTION FAILED (" + test_result["message"] + ")" + RESET_CODE + " (" + format_nanos(test_result["runTime"]) + ")", flush=True) |
| else: |
| print("ERROR PARSING TEST RESULT " + test_result, flush=True) |
| sys.exit(1) |
| |
| def format_nanos(nanos): |
| ms = int(nanos) / 1000000 |
| if ms < 800: |
| return "{:.2f}ms".format(ms) |
| seconds = ms / 1000 |
| if seconds < 60: |
| return "{:.2f}s".format(seconds) |
| minutes = seconds / 60 |
| return "{:.2f}m".format(minutes) |
| |
| class BtestRun: |
| def __init__(self): |
| self.passed_tests = [] |
| self.failed_tests = [] |
| self.ignored_tests = [] |
| self.assumption_failed_tests = [] |
| |
| def execute_shell_command(stage, args, command, **extra_args): |
| debug(args, "[" + stage + "] Executing '" + " ".join(command) + "'") |
| r = subprocess.run(command, capture_output=True, text=True, **extra_args) |
| output = r.stdout |
| debug(args, "[" + stage + "] Output: '" + output + "' Err: '" + r.stderr + "'") |
| return output, r.stderr |
| |
| def create_user(args): |
| output, err = execute_shell_command("Test", args, ["adb", "shell", "pm", "create-user", "user"]) |
| id = int(output.rsplit(" ", 1)[1].strip()) |
| execute_shell_command("Test", args, ["adb", "shell", "am start-user " + str(id)]) |
| return id |
| |
| def create_work_profile(args, parent_id): |
| output, err = execute_shell_command("Test", args, ["adb", "shell", "pm", "create-user", "--managed", "--profileOf", str(parent_id), "user"]) |
| id = int(output.rsplit(" ", 1)[1].strip()) |
| execute_shell_command("Test", args, ["adb", "shell", "am start-user " + str(id)]) |
| return id |
| |
| def gather_device_state(args): |
| current_user = get_current_user(args) |
| users = get_users(args) |
| return {"current_user": current_user, "users": users} |
| |
| def get_users(args): |
| users_output, err = execute_shell_command("Test", args, ["adb", "shell", "cmd user list -v"]) |
| users = {} |
| for user_row in users_output.split("\n")[1:]: |
| if not user_row: |
| continue |
| |
| id = int(user_row.split("id=", 2)[1].split(",", 2)[0]) |
| type = user_row.split("type=", 2)[1].split(",", 2)[0] |
| flags = user_row.split("flags=", 2)[1].split(" ", 2)[0].split("|") |
| parent = None |
| if "MANAGED_PROFILE" in flags: |
| parent = int(user_row.split("parentId=", 2)[1].split(")", 2)[0]) |
| user = {"id": id, "flags": flags, "type": type, "parent": parent} |
| users[user["id"]] = user |
| |
| for user in users.values(): |
| if user["type"] == "profile.MANAGED": |
| users[user["parent"]]["work_profile_id"] = user["id"] |
| |
| return users |
| |
| def get_current_user(args): |
| return int(execute_shell_command("Test", args, ["adb", "shell", "am", "get-current-user"])[0].strip()) |
| |
| def main(): |
| args = get_args() |
| |
| if args.build: |
| build_module(args) |
| |
| if args.install: |
| install(args) |
| |
| if args.test: |
| test = None |
| btest_run = BtestRun() |
| total_test_count = 0 |
| next_test = 1 |
| |
| device_state = gather_device_state(args) |
| # Construct modules with args |
| states = [m(args) for m in supported_modules[args.module][STATES]] |
| |
| |
| if RUN_ON_CURRENT_USER in args.states: |
| for state in states: |
| if (state.is_active(device_state)): |
| # Found current |
| args.states.append(state.__class__) |
| break |
| |
| # We calculate annotations before filtering so we properly exclude all |
| all_include_annotations = [] |
| for state in states: |
| all_include_annotations.extend(state.include_annotations()) |
| |
| states = [m for m in states if m.__class__ in args.states] |
| |
| first_state = None |
| |
| for state in states: |
| if (state.is_active(device_state)): |
| first_state = state |
| state.initialise(device_state) # Entering a state we are already in |
| break |
| |
| if first_state is None: |
| # We are not in any state, enter the first one arbitrarily |
| first_state = states[0] |
| first_state.enter() |
| |
| # Move to start |
| states.insert(0, states.pop(states.index(first_state))) |
| needs_to_enter_state = False |
| |
| try: |
| for i, state in enumerate(states): |
| debug(args, "[Test] Running tests for " + state.name()) |
| if needs_to_enter_state: |
| state.enter() |
| include_annotations = state.include_annotations() |
| exclude_annotations = [x for x in all_include_annotations if not x in include_annotations] |
| test = Test(args, state, btest_run, total_test_count, next_test, include_annotations, exclude_annotations, (i < len(states) - 1)) |
| test.run() |
| total_test_count = test.total_test_count |
| next_test = test.next_test |
| needs_to_enter_state = True |
| except KeyboardInterrupt: |
| # Kill the test process then move on to print the results |
| if test is not None: |
| test.test_process.kill() |
| except Exception as e: |
| if test is not None: |
| test.test_process.kill() |
| raise e |
| |
| print("\n" + PASSED_CODE + "Passed: " + str(len(btest_run.passed_tests)) + RESET_CODE |
| + "," + FAILED_CODE + " Failed: " + str(len(btest_run.failed_tests)) + RESET_CODE |
| + "," + IGNORED_CODE + " Ignored: " + str(len(btest_run.ignored_tests)) + RESET_CODE |
| + ", " + IGNORED_CODE + "Assumption Failed: " + str(len(btest_run.assumption_failed_tests)) + RESET_CODE) |
| |
| if len(btest_run.failed_tests) > 0: |
| print("\n\nFailures:") |
| for test_result in btest_run.failed_tests: |
| print(test_result["testName"] + " ❌ " + FAILED_CODE + "FAILED (" + test_result["message"] + ")" + RESET_CODE + " (" + format_nanos(test_result["runTime"]) + ")", flush=True) |
| |
| def debug(args, msg): |
| if args.debug: |
| print(msg) |
| |
| if __name__ == '__main__': |
| main() |