blob: b293b8aa3a94d9f95451698a5e5cb3dd1f5847b8 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""End-to-end test for afdo_prof_analysis."""
from __future__ import absolute_import, division, print_function
import json
import os
import shutil
import tempfile
import unittest
from datetime import date
from afdo_tools.bisection import afdo_prof_analysis as analysis
class ObjectWithFields(object):
"""Turns kwargs given to the constructor into fields on an object.
Examples:
x = ObjectWithFields(a=1, b=2)
assert x.a == 1
assert x.b == 2
"""
def __init__(self, **kwargs):
for key, val in kwargs.items():
setattr(self, key, val)
class AfdoProfAnalysisE2ETest(unittest.TestCase):
"""Class for end-to-end testing of AFDO Profile Analysis"""
# nothing significant about the values, just easier to remember even vs odd
good_prof = {
'func_a': ':1\n 1: 3\n 3: 5\n 5: 7\n',
'func_b': ':3\n 3: 5\n 5: 7\n 7: 9\n',
'func_c': ':5\n 5: 7\n 7: 9\n 9: 11\n',
'func_d': ':7\n 7: 9\n 9: 11\n 11: 13\n',
'good_func_a': ':11\n',
'good_func_b': ':13\n'
}
bad_prof = {
'func_a': ':2\n 2: 4\n 4: 6\n 6: 8\n',
'func_b': ':4\n 4: 6\n 6: 8\n 8: 10\n',
'func_c': ':6\n 6: 8\n 8: 10\n 10: 12\n',
'func_d': ':8\n 8: 10\n 10: 12\n 12: 14\n',
'bad_func_a': ':12\n',
'bad_func_b': ':14\n'
}
expected = {
'good_only_functions': False,
'bad_only_functions': True,
'bisect_results': {
'ranges': [],
'individuals': ['func_a']
}
}
def test_afdo_prof_analysis(self):
# Individual issues take precedence by nature of our algos
# so first, that should be caught
good = self.good_prof.copy()
bad = self.bad_prof.copy()
self.run_check(good, bad, self.expected)
# Now remove individuals and exclusively BAD, and check that range is caught
bad['func_a'] = good['func_a']
bad.pop('bad_func_a')
bad.pop('bad_func_b')
expected_cp = self.expected.copy()
expected_cp['bad_only_functions'] = False
expected_cp['bisect_results'] = {
'individuals': [],
'ranges': [['func_b', 'func_c', 'func_d']]
}
self.run_check(good, bad, expected_cp)
def test_afdo_prof_state(self):
"""Verifies that saved state is correct replication."""
temp_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True)
good = self.good_prof.copy()
bad = self.bad_prof.copy()
# add more functions to data
for x in range(400):
good['func_%d' % x] = ''
bad['func_%d' % x] = ''
fd_first, first_result = tempfile.mkstemp(dir=temp_dir)
os.close(fd_first)
fd_state, state_file = tempfile.mkstemp(dir=temp_dir)
os.close(fd_state)
self.run_check(
self.good_prof,
self.bad_prof,
self.expected,
state_file=state_file,
out_file=first_result)
fd_second, second_result = tempfile.mkstemp(dir=temp_dir)
os.close(fd_second)
completed_state_file = '%s.completed.%s' % (state_file, str(date.today()))
self.run_check(
self.good_prof,
self.bad_prof,
self.expected,
state_file=completed_state_file,
no_resume=False,
out_file=second_result)
with open(first_result) as f:
initial_run = json.load(f)
with open(second_result) as f:
loaded_run = json.load(f)
self.assertEqual(initial_run, loaded_run)
def test_exit_on_problem_status(self):
temp_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True)
fd_state, state_file = tempfile.mkstemp(dir=temp_dir)
os.close(fd_state)
with self.assertRaises(RuntimeError):
self.run_check(
self.good_prof,
self.bad_prof,
self.expected,
state_file=state_file,
extern_decider='problemstatus_external.sh')
def test_state_assumption(self):
def compare_runs(tmp_dir, first_ctr, second_ctr):
"""Compares given prof versions between first and second run in test."""
first_prof = '%s/.first_run_%d' % (tmp_dir, first_ctr)
second_prof = '%s/.second_run_%d' % (tmp_dir, second_ctr)
with open(first_prof) as f:
first_prof_text = f.read()
with open(second_prof) as f:
second_prof_text = f.read()
self.assertEqual(first_prof_text, second_prof_text)
good_prof = {'func_a': ':1\n3: 3\n5: 7\n'}
bad_prof = {'func_a': ':2\n4: 4\n6: 8\n'}
# add some noise to the profiles; 15 is an arbitrary choice
for x in range(15):
func = 'func_%d' % x
good_prof[func] = ':%d\n' % (x)
bad_prof[func] = ':%d\n' % (x + 1)
expected = {
'bisect_results': {
'ranges': [],
'individuals': ['func_a']
},
'good_only_functions': False,
'bad_only_functions': False
}
# using a static temp dir rather than a dynamic one because these files are
# shared between the bash scripts and this Python test, and the arguments
# to the bash scripts are fixed by afdo_prof_analysis.py so it would be
# difficult to communicate dynamically generated directory to bash scripts
scripts_tmp_dir = '%s/afdo_test_tmp' % os.getcwd()
os.mkdir(scripts_tmp_dir)
self.addCleanup(shutil.rmtree, scripts_tmp_dir, ignore_errors=True)
# files used in the bash scripts used as external deciders below
# - count_file tracks the current number of calls to the script in total
# - local_count_file tracks the number of calls to the script without
# interruption
count_file = '%s/.count' % scripts_tmp_dir
local_count_file = '%s/.local_count' % scripts_tmp_dir
# runs through whole thing at once
initial_seed = self.run_check(
good_prof,
bad_prof,
expected,
extern_decider='state_assumption_external.sh')
with open(count_file) as f:
num_calls = int(f.read())
os.remove(count_file) # reset counts for second run
finished_state_file = 'afdo_analysis_state.json.completed.%s' % str(
date.today())
self.addCleanup(os.remove, finished_state_file)
# runs the same analysis but interrupted each iteration
for i in range(2 * num_calls + 1):
no_resume_run = (i == 0)
seed = initial_seed if no_resume_run else None
try:
self.run_check(
good_prof,
bad_prof,
expected,
no_resume=no_resume_run,
extern_decider='state_assumption_interrupt.sh',
seed=seed)
break
except RuntimeError:
# script was interrupted, so we restart local count
os.remove(local_count_file)
else:
raise RuntimeError('Test failed -- took too many iterations')
for initial_ctr in range(3): # initial runs unaffected by interruption
compare_runs(scripts_tmp_dir, initial_ctr, initial_ctr)
start = 3
for ctr in range(start, num_calls):
# second run counter incremented by 4 for each one first run is because
# +2 for performing initial checks on good and bad profs each time
# +1 for PROBLEM_STATUS run which causes error and restart
compare_runs(scripts_tmp_dir, ctr, 6 + (ctr - start) * 4)
def run_check(self,
good_prof,
bad_prof,
expected,
state_file=None,
no_resume=True,
out_file=None,
extern_decider=None,
seed=None):
temp_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True)
good_prof_file = '%s/%s' % (temp_dir, 'good_prof.txt')
bad_prof_file = '%s/%s' % (temp_dir, 'bad_prof.txt')
good_prof_text = analysis.json_to_text(good_prof)
bad_prof_text = analysis.json_to_text(bad_prof)
with open(good_prof_file, 'w') as f:
f.write(good_prof_text)
with open(bad_prof_file, 'w') as f:
f.write(bad_prof_text)
dir_path = os.path.dirname(os.path.realpath(__file__)) # dir of this file
external_script = '%s/%s' % (dir_path, extern_decider or 'e2e_external.sh')
# FIXME: This test ideally shouldn't be writing to $PWD
if state_file is None:
state_file = '%s/afdo_analysis_state.json' % os.getcwd()
def rm_state():
try:
os.unlink(state_file)
except OSError:
# Probably because the file DNE. That's fine.
pass
self.addCleanup(rm_state)
actual = analysis.main(
ObjectWithFields(
good_prof=good_prof_file,
bad_prof=bad_prof_file,
external_decider=external_script,
analysis_output_file=out_file or '/dev/null',
state_file=state_file,
no_resume=no_resume,
remove_state_on_completion=False,
seed=seed,
))
actual_seed = actual.pop('seed') # nothing to check
self.assertEqual(actual, expected)
return actual_seed
if __name__ == '__main__':
unittest.main()