blob: 4ef57d25b8ef2233057ef4dbd5646c604dbd5010 [file] [log] [blame]
#!/usr/bin/env python3
import argparse
import pathlib
from common import (
download_reports,
get_testcases,
is_failure,
is_passing_skipped_test,
is_unexpected_success,
key,
open_test_results,
)
"""
Usage: update_failures.py /path/to/dynamo_test_failures.py commit_sha
Best-effort updates the xfail and skip lists in dynamo_test_failures.py
by parsing test reports.
You'll need to provide the commit_sha for the latest commit on a PR
from which we will pull CI test results.
Instructions:
- On your PR, add the "keep-going" label to ensure that all the tests are
failing (as opposed to CI stopping on the first failure). You may need to
restart your test jobs by force-pushing to your branch for CI to pick
up the "keep-going" label.
- Wait for all the tests to finish running.
- Find the full SHA of your commit and run this command.
This script requires the `gh` cli. You'll need to install it and then
authenticate with it via `gh auth login` before using this script.
https://docs.github.com/en/github-cli/github-cli/quickstart
"""
def patch_file(filename, unexpected_successes, new_xfails, new_skips, unexpected_skips):
with open(filename, "r") as f:
text = f.readlines()
new_text = []
i = 0
while True:
line = text[i]
if line.startswith("dynamo_expected_failures"):
break
new_text.append(line)
i += 1
def format(testcase):
classname = testcase.attrib["classname"]
name = testcase.attrib["name"]
return f"{classname}.{name}"
formatted_unexpected_successes = {
f"{format(test)}" for test in unexpected_successes.values()
}
formatted_unexpected_skips = {
f"{format(test)}" for test in unexpected_skips.values()
}
formatted_new_xfails = [
f' "{format(test)}", # {test.attrib["file"]}\n'
for test in new_xfails.values()
]
formatted_new_skips = [
f' "{format(test)}", # {test.attrib["file"]}\n'
for test in new_skips.values()
]
def is_in(lst, line):
splits = line.split('"')
if len(splits) < 3:
return None
test_name = splits[1]
if test_name in lst:
return test_name
return None
covered_unexpected_successes = set({})
# dynamo_expected_failures
while True:
line = text[i]
match = is_in(formatted_unexpected_successes, line)
if match is not None:
covered_unexpected_successes.add(match)
i += 1
continue
if line == "}\n":
new_text.extend(formatted_new_xfails)
new_text.append(line)
i += 1
break
new_text.append(line)
i += 1
leftover_unexpected_successes = (
formatted_unexpected_successes - covered_unexpected_successes
)
if len(leftover_unexpected_successes) > 0:
print(
"WARNING: we were unable to remove these "
f"{len(leftover_unexpected_successes)} expectedFailures:"
)
for stuff in leftover_unexpected_successes:
print(stuff)
# dynamo_skips
while True:
line = text[i]
match = is_in(formatted_unexpected_skips, line)
if match is not None:
i += 1
continue
if line == "}\n":
new_text.extend(formatted_new_skips)
break
if line == "dynamo_skips = {}\n":
new_text.extend("dynamo_skips = {\n")
new_text.extend(new_skips)
new_text.extend("}\n")
i += 1
break
new_text.append(line)
i += 1
for j in range(i, len(text)):
new_text.append(text[j])
with open(filename, "w") as f:
f.writelines(new_text)
def get_intersection_and_outside(a_dict, b_dict):
a = set(a_dict.keys())
b = set(b_dict.keys())
intersection = a.intersection(b)
outside = (a.union(b)) - intersection
def build_dict(keys):
result = {}
for k in keys:
if k in a_dict:
result[k] = a_dict[k]
else:
result[k] = b_dict[k]
return result
return build_dict(intersection), build_dict(outside)
def update(filename, py38_dir, py311_dir, also_remove_skips):
def read_test_results(directory):
xmls = open_test_results(directory)
testcases = get_testcases(xmls)
unexpected_successes = {
key(test): test for test in testcases if is_unexpected_success(test)
}
failures = {key(test): test for test in testcases if is_failure(test)}
passing_skipped_tests = {
key(test): test for test in testcases if is_passing_skipped_test(test)
}
return unexpected_successes, failures, passing_skipped_tests
(
py38_unexpected_successes,
py38_failures,
py38_passing_skipped_tests,
) = read_test_results(py38_dir)
(
py311_unexpected_successes,
py311_failures,
py311_passing_skipped_tests,
) = read_test_results(py311_dir)
unexpected_successes = {**py38_unexpected_successes, **py311_unexpected_successes}
_, skips = get_intersection_and_outside(
py38_unexpected_successes, py311_unexpected_successes
)
xfails, more_skips = get_intersection_and_outside(py38_failures, py311_failures)
if also_remove_skips:
unexpected_skips, _ = get_intersection_and_outside(
py38_passing_skipped_tests, py311_passing_skipped_tests
)
else:
unexpected_skips = {}
all_skips = {**skips, **more_skips}
print(
f"Discovered {len(unexpected_successes)} new unexpected successes, "
f"{len(xfails)} new xfails, {len(all_skips)} new skips, {len(unexpected_skips)} new unexpected skips"
)
return patch_file(
filename, unexpected_successes, xfails, all_skips, unexpected_skips
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="update_dynamo_test_failures",
description="Read from logs and update the dynamo_test_failures file",
)
# dynamo_test_failures path
parser.add_argument(
"filename",
nargs="?",
default=str(
pathlib.Path(__file__).absolute().parent.parent.parent
/ "torch/testing/_internal/dynamo_test_failures.py"
),
help="Optional path to dynamo_test_failures.py",
)
parser.add_argument(
"commit",
help=(
"The commit sha for the latest commit on a PR from which we will "
"pull CI test results, e.g. 7e5f597aeeba30c390c05f7d316829b3798064a5"
),
)
parser.add_argument(
"--also-remove-skips",
help="Also attempt to remove skips. WARNING: does not guard against test flakiness",
action="store_true",
)
args = parser.parse_args()
assert pathlib.Path(args.filename).exists(), args.filename
dynamo38, dynamo311 = download_reports(args.commit, ("dynamo38", "dynamo311"))
update(args.filename, dynamo38, dynamo311, args.also_remove_skips)