blob: 0b83db1d9b4771b71f1ddfc787dde427eb6f53c2 [file] [log] [blame]
# Copyright 2015, VIXL authors
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of ARM Limited nor the names of its contributors may be
# used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import collections
import multiprocessing
import re
import time
import printer
import thread_pool
REGEXP_MISSING_FEATURES = "Missing features: { ([^,}]+(, [^,}]+)*) }"
class Test(object):
# Shared state for multiprocessing. Ideally the context should be passed with
# arguments, but constraints from the multiprocessing module prevent us from
# doing so: the shared variables (multiprocessing.Value) must be either global
# or static, or no work is started.
n_tests_passed = multiprocessing.Value('i', 0)
n_tests_failed = multiprocessing.Value('i', 0)
n_tests_skipped = multiprocessing.Value('i', 0)
manager = multiprocessing.Manager()
def __init__(self, name, shared, **kwargs):
self.name = name
self.shared = shared
self.args = kwargs
class TestQueue(object):
def __init__(self, prefix = ''):
self.progress_prefix = prefix
self.queue = []
self.tests_skipped = Test.manager.dict()
self.n_known_failures = 0
self.known_failures = collections.Counter()
def AddKnownFailures(self, reason, n_tests):
self.n_known_failures += n_tests
self.known_failures[reason] += n_tests
def AddTest(self, name, **kwargs):
self.queue.append(Test(name, self, **kwargs))
# Run the specified tests.
def Run(self, jobs, verbose, run_function):
def InitGlobals():
# Initialisation.
self.start_time = time.time()
self.n_tests = len(self.queue)
if self.n_tests == 0:
printer.Print('No tests to run.')
return False
Test.n_tests_passed.value = 0
Test.n_tests_failed.value = 0
Test.n_tests_skipped.value = 0
self.tests_skipped.clear()
return True
thread_pool.Multithread(run_function, self.queue, jobs, InitGlobals)
printer.UpdateProgress(self.start_time,
Test.n_tests_passed.value,
Test.n_tests_failed.value,
self.n_tests,
Test.n_tests_skipped.value,
self.n_known_failures,
'== Done ==',
prevent_next_overwrite = True,
prefix = self.progress_prefix)
n_tests_features = 0
features = set()
for reason, n_tests in self.tests_skipped.items():
m = re.match(REGEXP_MISSING_FEATURES, reason)
if m:
if verbose:
printer.Print("%d tests skipped because the following features are not "
"available '%s'" % (n_tests, m.group(1)))
else:
n_tests_features += n_tests
features.update(m.group(1).split(', '))
else:
printer.Print("%d tests skipped because '%s'" % (n_tests, reason))
n_tests_other = 0
if n_tests_features > 0 :
printer.Print("%d tests skipped because the CPU does not support "
"the following features: '%s'" %
(n_tests_features, ", ".join(features)))
for reason, n_tests in self.known_failures.items():
printer.Print("%d tests skipped because '%s'" % (n_tests, reason))
# Empty the queue now that the tests have been run.
self.queue = []
# `0` indicates success
return Test.n_tests_failed.value