| #!/usr/bin/env python3 |
| # SPDX-License-Identifier: GPL-2.0-or-later |
| # Copyright (c) 2025 Cyril Hrubis <chrubis@suse.cz> |
| # Copyright (c) 2025 Andrea Cervesato <andrea.cervesato@suse.com> |
| """ |
| This script parses JSON results from kirk and LTP metadata in order to |
| calculate timeouts for tests based on the results file. |
| It can also patch tests automatically and replace the calculated timeout. |
| """ |
| |
| import re |
| import os |
| import json |
| import argparse |
| |
| # The test runtime is multiplied by this to get a timeout |
| TIMEOUT_MUL = 1.2 |
| |
| |
| def _sed(fname, expr, replace): |
| """ |
| Pythonic version of sed command. |
| """ |
| content = [] |
| matcher = re.compile(expr) |
| |
| with open(fname, 'r', encoding="utf-8") as data: |
| for line in data: |
| match = matcher.search(line) |
| if not match: |
| content.append(line) |
| else: |
| content.append(replace) |
| |
| with open(fname, 'w', encoding="utf-8") as data: |
| data.writelines(content) |
| |
| |
| def _patch(ltp_dir, fname, new_timeout, override): |
| """ |
| If `override` is True, it patches a test file, searching for timeout and |
| replacing it with `new_timeout`. |
| """ |
| orig_timeout = None |
| file_path = os.path.join(ltp_dir, fname) |
| |
| with open(file_path, 'r', encoding="utf-8") as c_source: |
| matcher = re.compile(r'\s*.timeout\s*=\s*(\d+).') |
| for line in c_source: |
| match = matcher.search(line) |
| if not match: |
| continue |
| |
| timeout = match.group(1) |
| orig_timeout = int(timeout) |
| |
| if orig_timeout: |
| if orig_timeout < new_timeout or override: |
| print(f"CHANGE {fname} timeout {orig_timeout} -> {new_timeout}") |
| _sed(file_path, r".timeout = [0-9]*,\n", |
| f"\t.timeout = {new_timeout},\n") |
| else: |
| print(f"KEEP {fname} timeout {orig_timeout} (new {new_timeout})") |
| else: |
| print(f"ADD {fname} timeout {new_timeout}") |
| _sed(file_path, |
| "static struct tst_test test = {", |
| "static struct tst_test test = {\n" |
| f"\t.timeout = {new_timeout},\n") |
| |
| |
| def _patch_all(ltp_dir, timeouts, override): |
| """ |
| Patch all tests. |
| """ |
| for timeout in timeouts: |
| if timeout['path']: |
| _patch(ltp_dir, timeout['path'], timeout['timeout'], override) |
| |
| |
| def _print_table(timeouts): |
| """ |
| Print the timeouts table. |
| """ |
| timeouts.sort(key=lambda x: x['timeout'], reverse=True) |
| |
| total = 0 |
| |
| print("Old library tests\n-----------------\n") |
| for timeout in timeouts: |
| if not timeout['newlib']: |
| print(f"{timeout['name']:30s} {timeout['timeout']}") |
| total += 1 |
| |
| print(f"\n\t{total} tests in total") |
| |
| total = 0 |
| |
| print("\nNew library tests\n-----------------\n") |
| for timeout in timeouts: |
| if timeout['newlib']: |
| print(f"{timeout['name']:30s} {timeout['timeout']}") |
| total += 1 |
| |
| print(f"\n\t{total} tests in total") |
| |
| |
| def _parse_data(ltp_dir, results_path): |
| """ |
| Parse results data and metadata, then it generates timeouts data. |
| """ |
| timeouts = [] |
| results = None |
| metadata = None |
| |
| with open(results_path, 'r', encoding="utf-8") as file: |
| results = json.load(file) |
| |
| metadata_path = os.path.join(ltp_dir, 'metadata', 'ltp.json') |
| with open(metadata_path, 'r', encoding="utf-8") as file: |
| metadata = json.load(file) |
| |
| for test in results['results']: |
| name = test['test_fqn'] |
| duration = test['test']['duration'] |
| |
| # if test runs for all_filesystems, normalize runtime to one filesystem |
| filesystems = max(1, test['test']['log'].count('TINFO: Formatting /')) |
| |
| # check if test is new library test |
| test_is_newlib = name in metadata['tests'] |
| |
| # store test file path |
| path = None |
| if test_is_newlib: |
| path = metadata['tests'][name]['fname'] |
| |
| test_has_runtime = False |
| if test_is_newlib: |
| # filter out tests with runtime |
| test_has_runtime = 'runtime' in metadata['tests'][name] |
| |
| # timer tests define runtime dynamically in timer library |
| test_has_runtime = 'sample' in metadata['tests'][name] |
| |
| # select tests that does not have runtime and which are executed |
| # for a long time |
| if not test_has_runtime and duration >= 0.5: |
| data = {} |
| data["name"] = name |
| data["timeout"] = int(TIMEOUT_MUL * duration/filesystems + 0.5) |
| data["newlib"] = test_is_newlib |
| data["path"] = path |
| |
| timeouts.append(data) |
| |
| return timeouts |
| |
| |
| def _file_exists(filepath): |
| """ |
| Check if the given file path exists. |
| """ |
| if not os.path.isfile(filepath): |
| raise argparse.ArgumentTypeError( |
| f"The file '{filepath}' does not exist.") |
| return filepath |
| |
| |
| def _dir_exists(dirpath): |
| """ |
| Check if the given directory path exists. |
| """ |
| if not os.path.isdir(dirpath): |
| raise argparse.ArgumentTypeError( |
| f"The directory '{dirpath}' does not exist.") |
| return dirpath |
| |
| |
| def run(): |
| """ |
| Entry point of the script. |
| """ |
| parser = argparse.ArgumentParser( |
| description="Script to calculate LTP tests timeouts") |
| |
| parser.add_argument( |
| '-l', |
| '--ltp-dir', |
| type=_dir_exists, |
| help='LTP source code directory', |
| default='..') |
| |
| parser.add_argument( |
| '-r', |
| '--results', |
| type=_file_exists, |
| required=True, |
| help='kirk results.json file location') |
| |
| parser.add_argument( |
| '-o', |
| '--override', |
| default=False, |
| action='store_true', |
| help='Always override test timeouts') |
| |
| parser.add_argument( |
| '-p', |
| '--patch', |
| default=False, |
| action='store_true', |
| help='Patch tests with updated timeout') |
| |
| parser.add_argument( |
| '-t', |
| '--print-table', |
| default=True, |
| action='store_true', |
| help='Print table with suggested timeouts') |
| |
| args = parser.parse_args() |
| |
| timeouts = _parse_data(args.ltp_dir, args.results) |
| |
| if args.print_table: |
| _print_table(timeouts) |
| |
| if args.patch: |
| _patch_all(args.ltp_dir, timeouts, args.override) |
| |
| |
| if __name__ == "__main__": |
| run() |