| #!/usr/bin/env python3 |
| # |
| # Copyright 2024 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Classes to help coordinate running tasks and displaying progress.""" |
| |
| import subprocess |
| import sys |
| import threading |
| |
| from .errors import TaskError |
| |
| |
| class Task: |
| """Defines a task to be run by the task_runner.""" |
| |
| def __init__(self, cmd, fall_back_tasks=None): |
| self.cmd = cmd |
| self.fall_back_tasks = fall_back_tasks |
| |
| |
| class TaskResult: |
| """Holds result and status code of a task.""" |
| |
| def __init__(self, status_code, result=''): |
| self.status_code = status_code |
| self.result = result |
| |
| |
| class TaskRunner: |
| """Runs a set of tasks and displays progress.""" |
| |
| def __init__(self): |
| self.tasks = {} |
| self.task_queue = [] |
| |
| self.running = False |
| |
| # UI |
| self.quiet = False |
| self.output = '' |
| self.running_indicator_thread = None |
| self.running_indicator_chars = ['→'] |
| # self.running_indicator_chars = ['◢', '◣', '◤', '◥'] |
| self.running_indicator_index = 0 |
| self.stop_event = threading.Event() |
| |
| def add_task(self, name, function, *args, fall_back_tasks=None, **kwargs): |
| """Adds a task to the queue.""" |
| self.tasks[name] = { |
| 'status': 'pending', |
| 'function': function, |
| 'output': '', |
| 'args': args, |
| 'kwargs': kwargs, |
| 'fall_back_tasks': fall_back_tasks, |
| } |
| self.task_queue.append(name) |
| |
| def start(self): |
| """Starts running all the tasks in the queue.""" |
| print('Running Plan:') |
| self.running = True |
| self._run_next_task() |
| |
| def run_task(self, name): |
| """Run this task in the queue.""" |
| task = self.tasks[name] |
| self.render_output() |
| try: |
| for line in task['function'](*task['args'], **task['kwargs']): |
| if isinstance(line, TaskResult): |
| result = line |
| if result.status_code != 0: |
| raise TaskError(f'status_code: {result.status_code}') |
| else: |
| self.tasks[name]['output'] += line |
| self.tasks[name]['status'] = 'completed' |
| if self.running: |
| self._run_next_task() |
| except TaskError as e: |
| self.tasks[name]['status'] = 'failed' |
| self.tasks[name]['output'] += f'Error: {e}\n' |
| self.render_output() |
| |
| fall_back_tasks = self.tasks[name].get('fall_back_tasks', []) |
| if fall_back_tasks: |
| self.task_queue = [] |
| for t in fall_back_tasks: |
| if isinstance(t, str): |
| self.add_shell_command_task([t]) |
| self._run_next_task() |
| else: |
| if self.running: |
| self.running = False |
| |
| def _run_next_task(self): |
| """Runs the next task in the queue.""" |
| if self.task_queue and self.running: |
| name = self.task_queue.pop(0) |
| self.tasks[name]['status'] = 'running' |
| threading.Thread(target=self.run_task, args=(name,)).start() |
| elif self.running: |
| self.running = False |
| self.render_output() |
| |
| if self.quiet: |
| return |
| |
| print('') |
| print('Run Completed Successfully!') |
| print('') |
| |
| def add_shell_command_task(self, command, fall_back_tasks=None): |
| """Adds a shell command to the task queue.""" |
| self.add_task( |
| command, run_shell_command, command, fall_back_tasks=fall_back_tasks |
| ) |
| |
| def render_output(self): |
| """Prints the output of the tasks as well as a table showing the progres on the task queue.""" |
| if self.quiet: |
| return |
| |
| # os.system('cls' if os.name == 'nt' else 'clear') |
| print(f'{self.output}', end='') |
| for name, command_data in self.tasks.items(): |
| print(f"{command_data['output']}", end='') |
| |
| for name, command_data in self.tasks.items(): |
| status_icon = '.' |
| status_color = '\033[94m' # Blue |
| if command_data['status'] == 'completed': |
| status_icon = '✓' |
| status_color = '\033[32m' # Green |
| elif command_data['status'] == 'running': |
| status_icon = self.running_indicator_chars[self.running_indicator_index] |
| status_color = '\033[32m' # Green |
| elif command_data['status'] == 'failed': |
| status_icon = '✗' |
| status_color = '\033[91m' # Red |
| print(f'{status_color}{status_icon}\033[0m {name}\033[0m') |
| print('-' * 20) |
| |
| |
| def run_shell_command(command, use_stdout=True): |
| """Run a shell command and yield output.""" |
| last_line = '' |
| |
| if use_stdout: |
| with subprocess.Popen( |
| command, |
| shell=True, |
| stdout=sys.stdout, |
| stderr=sys.stderr, |
| text=True, |
| ) as process: |
| status_code = process.wait() |
| yield TaskResult(status_code=status_code) |
| else: |
| with subprocess.Popen( |
| command, |
| shell=True, |
| text=True, |
| ) as process: |
| status_code = process.wait() |
| for line in iter(process.stdout.readline, ''): |
| if line.strip() == last_line: |
| continue |
| last_line = line.strip() |
| yield line |
| process.stdout.flush() |
| process.stdout.close() |
| status_code = process.wait() |
| yield TaskResult(status_code=status_code) |