blob: 808e4185ef0a25f53d9eb348d4da5a079c6e8d3b [file] [log] [blame]
import os, sys, unittest, getopt, time
use_resources = []
class ResourceDenied(Exception):
"""Test skipped because it requested a disallowed resource.
This is raised when a test calls requires() for a resource that
has not be enabled. Resources are defined by test modules.
"""
def is_resource_enabled(resource):
"""Test whether a resource is enabled.
If the caller's module is __main__ then automatically return True."""
if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
return True
result = use_resources is not None and \
(resource in use_resources or "*" in use_resources)
if not result:
_unavail[resource] = None
return result
_unavail = {}
def requires(resource, msg=None):
"""Raise ResourceDenied if the specified resource is not available.
If the caller's module is __main__ then automatically return True."""
# see if the caller's module is __main__ - if so, treat as if
# the resource was set
if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the `%s' resource not enabled" % resource
raise ResourceDenied(msg)
def find_package_modules(package, mask):
import fnmatch
if (hasattr(package, "__loader__") and
hasattr(package.__loader__, '_files')):
path = package.__name__.replace(".", os.path.sep)
mask = os.path.join(path, mask)
for fnm in package.__loader__._files.iterkeys():
if fnmatch.fnmatchcase(fnm, mask):
yield os.path.splitext(fnm)[0].replace(os.path.sep, ".")
else:
path = package.__path__[0]
for fnm in os.listdir(path):
if fnmatch.fnmatchcase(fnm, mask):
yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0])
def get_tests(package, mask, verbosity, exclude=()):
"""Return a list of skipped test modules, and a list of test cases."""
tests = []
skipped = []
for modname in find_package_modules(package, mask):
if modname.split(".")[-1] in exclude:
skipped.append(modname)
if verbosity > 1:
print >> sys.stderr, "Skipped %s: excluded" % modname
continue
try:
mod = __import__(modname, globals(), locals(), ['*'])
except (ResourceDenied, unittest.SkipTest) as detail:
skipped.append(modname)
if verbosity > 1:
print >> sys.stderr, "Skipped %s: %s" % (modname, detail)
continue
for name in dir(mod):
if name.startswith("_"):
continue
o = getattr(mod, name)
if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase):
tests.append(o)
return skipped, tests
def usage():
print __doc__
return 1
def test_with_refcounts(runner, verbosity, testcase):
"""Run testcase several times, tracking reference counts."""
import gc
import ctypes
ptc = ctypes._pointer_type_cache.copy()
cfc = ctypes._c_functype_cache.copy()
wfc = ctypes._win_functype_cache.copy()
# when searching for refcount leaks, we have to manually reset any
# caches that ctypes has.
def cleanup():
ctypes._pointer_type_cache = ptc.copy()
ctypes._c_functype_cache = cfc.copy()
ctypes._win_functype_cache = wfc.copy()
gc.collect()
test = unittest.makeSuite(testcase)
for i in range(5):
rc = sys.gettotalrefcount()
runner.run(test)
cleanup()
COUNT = 5
refcounts = [None] * COUNT
for i in range(COUNT):
rc = sys.gettotalrefcount()
runner.run(test)
cleanup()
refcounts[i] = sys.gettotalrefcount() - rc
if filter(None, refcounts):
print "%s leaks:\n\t" % testcase, refcounts
elif verbosity:
print "%s: ok." % testcase
class TestRunner(unittest.TextTestRunner):
def run(self, test, skipped):
"Run the given test case or test suite."
# Same as unittest.TextTestRunner.run, except that it reports
# skipped tests.
result = self._makeResult()
startTime = time.time()
test(result)
stopTime = time.time()
timeTaken = stopTime - startTime
result.printErrors()
self.stream.writeln(result.separator2)
run = result.testsRun
if _unavail: #skipped:
requested = _unavail.keys()
requested.sort()
self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" %
(run, run != 1 and "s" or "", timeTaken,
len(skipped),
len(skipped) != 1 and "s" or ""))
self.stream.writeln("Unavailable resources: %s" % ", ".join(requested))
else:
self.stream.writeln("Ran %d test%s in %.3fs" %
(run, run != 1 and "s" or "", timeTaken))
self.stream.writeln()
if not result.wasSuccessful():
self.stream.write("FAILED (")
failed, errored = map(len, (result.failures, result.errors))
if failed:
self.stream.write("failures=%d" % failed)
if errored:
if failed: self.stream.write(", ")
self.stream.write("errors=%d" % errored)
self.stream.writeln(")")
else:
self.stream.writeln("OK")
return result
def main(*packages):
try:
opts, args = getopt.getopt(sys.argv[1:], "rqvu:x:")
except getopt.error:
return usage()
verbosity = 1
search_leaks = False
exclude = []
for flag, value in opts:
if flag == "-q":
verbosity -= 1
elif flag == "-v":
verbosity += 1
elif flag == "-r":
try:
sys.gettotalrefcount
except AttributeError:
print >> sys.stderr, "-r flag requires Python debug build"
return -1
search_leaks = True
elif flag == "-u":
use_resources.extend(value.split(","))
elif flag == "-x":
exclude.extend(value.split(","))
mask = "test_*.py"
if args:
mask = args[0]
for package in packages:
run_tests(package, mask, verbosity, search_leaks, exclude)
def run_tests(package, mask, verbosity, search_leaks, exclude):
skipped, testcases = get_tests(package, mask, verbosity, exclude)
runner = TestRunner(verbosity=verbosity)
suites = [unittest.makeSuite(o) for o in testcases]
suite = unittest.TestSuite(suites)
result = runner.run(suite, skipped)
if search_leaks:
# hunt for refcount leaks
runner = BasicTestRunner()
for t in testcases:
test_with_refcounts(runner, verbosity, t)
return bool(result.errors)
class BasicTestRunner:
def run(self, test):
result = unittest.TestResult()
test(result)
return result