blob: 965547ccf1480c01d0eda10c51374f30d1201c72 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2024 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Classes to help coordinate running tasks and displaying progress."""
import subprocess
import sys
import threading
from .errors import TaskError
class Task:
"""Defines a task to be run by the task_runner."""
def __init__(self, cmd, fall_back_tasks=None):
self.cmd = cmd
self.fall_back_tasks = fall_back_tasks
class TaskResult:
"""Holds result and status code of a task."""
def __init__(self, status_code, result=''):
self.status_code = status_code
self.result = result
class TaskRunner:
"""Runs a set of tasks and displays progress."""
def __init__(self):
self.tasks = {}
self.task_queue = []
self.running = False
# UI
self.quiet = False
self.output = ''
self.running_indicator_thread = None
self.running_indicator_chars = ['→']
# self.running_indicator_chars = ['◢', '◣', '◤', '◥']
self.running_indicator_index = 0
self.stop_event = threading.Event()
def add_task(self, name, function, *args, fall_back_tasks=None, **kwargs):
"""Adds a task to the queue."""
self.tasks[name] = {
'status': 'pending',
'function': function,
'output': '',
'args': args,
'kwargs': kwargs,
'fall_back_tasks': fall_back_tasks,
}
self.task_queue.append(name)
def start(self):
"""Starts running all the tasks in the queue."""
print('Running Plan:')
self.running = True
self._run_next_task()
def run_task(self, name):
"""Run this task in the queue."""
task = self.tasks[name]
self.render_output()
try:
for line in task['function'](*task['args'], **task['kwargs']):
if isinstance(line, TaskResult):
result = line
if result.status_code != 0:
raise TaskError(f'status_code: {result.status_code}')
else:
self.tasks[name]['output'] += line
self.tasks[name]['status'] = 'completed'
if self.running:
self._run_next_task()
except TaskError as e:
self.tasks[name]['status'] = 'failed'
self.tasks[name]['output'] += f'Error: {e}\n'
self.render_output()
fall_back_tasks = self.tasks[name].get('fall_back_tasks', [])
if fall_back_tasks:
self.task_queue = []
for t in fall_back_tasks:
if isinstance(t, str):
self.add_shell_command_task([t])
self._run_next_task()
else:
if self.running:
self.running = False
def _run_next_task(self):
"""Runs the next task in the queue."""
if self.task_queue and self.running:
name = self.task_queue.pop(0)
self.tasks[name]['status'] = 'running'
threading.Thread(target=self.run_task, args=(name,)).start()
elif self.running:
self.running = False
self.render_output()
if self.quiet:
return
print('')
print('Run Completed Successfully!')
print('')
def add_shell_command_task(self, command, fall_back_tasks=None):
"""Adds a shell command to the task queue."""
self.add_task(
command, run_shell_command, command, fall_back_tasks=fall_back_tasks
)
def render_output(self):
"""Prints the output of the tasks as well as a table showing the progres on the task queue."""
if self.quiet:
return
# os.system('cls' if os.name == 'nt' else 'clear')
print(f'{self.output}', end='')
for name, command_data in self.tasks.items():
print(f"{command_data['output']}", end='')
for name, command_data in self.tasks.items():
status_icon = '.'
status_color = '\033[94m' # Blue
if command_data['status'] == 'completed':
status_icon = '✓'
status_color = '\033[32m' # Green
elif command_data['status'] == 'running':
status_icon = self.running_indicator_chars[self.running_indicator_index]
status_color = '\033[32m' # Green
elif command_data['status'] == 'failed':
status_icon = '✗'
status_color = '\033[91m' # Red
print(f'{status_color}{status_icon}\033[0m {name}\033[0m')
print('-' * 20)
def run_shell_command(command, use_stdout=True):
"""Run a shell command and yield output."""
last_line = ''
if use_stdout:
with subprocess.Popen(
command,
shell=True,
stdout=sys.stdout,
stderr=sys.stderr,
text=True,
) as process:
status_code = process.wait()
yield TaskResult(status_code=status_code)
else:
with subprocess.Popen(
command,
shell=True,
text=True,
) as process:
status_code = process.wait()
for line in iter(process.stdout.readline, ''):
if line.strip() == last_line:
continue
last_line = line.strip()
yield line
process.stdout.flush()
process.stdout.close()
status_code = process.wait()
yield TaskResult(status_code=status_code)