| # -*- test-case-name: twisted.trial.test.test_tests -*- |
| # Copyright (c) 2001-2009 Twisted Matrix Laboratories. |
| # See LICENSE for details. |
| |
| """ |
| Things likely to be used by writers of unit tests. |
| |
| Maintainer: Jonathan Lange |
| """ |
| |
| |
| import doctest, inspect |
| import os, warnings, sys, tempfile, gc, types |
| from pprint import pformat |
| try: |
| from dis import findlinestarts as _findlinestarts |
| except ImportError: |
| # Definition copied from Python's Lib/dis.py - findlinestarts was not |
| # available in Python 2.3. This function is copyright Python Software |
| # Foundation, released under the Python license: |
| # http://www.python.org/psf/license/ |
| def _findlinestarts(code): |
| """Find the offsets in a byte code which are start of lines in the source. |
| |
| Generate pairs (offset, lineno) as described in Python/compile.c. |
| |
| """ |
| byte_increments = [ord(c) for c in code.co_lnotab[0::2]] |
| line_increments = [ord(c) for c in code.co_lnotab[1::2]] |
| |
| lastlineno = None |
| lineno = code.co_firstlineno |
| addr = 0 |
| for byte_incr, line_incr in zip(byte_increments, line_increments): |
| if byte_incr: |
| if lineno != lastlineno: |
| yield (addr, lineno) |
| lastlineno = lineno |
| addr += byte_incr |
| lineno += line_incr |
| if lineno != lastlineno: |
| yield (addr, lineno) |
| |
| from twisted.internet import defer, utils |
| from twisted.python import components, failure, log, monkey |
| from twisted.python.deprecate import getDeprecationWarningString |
| |
| from twisted.trial import itrial, reporter, util |
| |
| pyunit = __import__('unittest') |
| |
| from zope.interface import implements |
| |
| |
| |
| class SkipTest(Exception): |
| """ |
| Raise this (with a reason) to skip the current test. You may also set |
| method.skip to a reason string to skip it, or set class.skip to skip the |
| entire TestCase. |
| """ |
| |
| |
| class FailTest(AssertionError): |
| """Raised to indicate the current test has failed to pass.""" |
| |
| |
| class Todo(object): |
| """ |
| Internal object used to mark a L{TestCase} as 'todo'. Tests marked 'todo' |
| are reported differently in Trial L{TestResult}s. If todo'd tests fail, |
| they do not fail the suite and the errors are reported in a separate |
| category. If todo'd tests succeed, Trial L{TestResult}s will report an |
| unexpected success. |
| """ |
| |
| def __init__(self, reason, errors=None): |
| """ |
| @param reason: A string explaining why the test is marked 'todo' |
| |
| @param errors: An iterable of exception types that the test is |
| expected to raise. If one of these errors is raised by the test, it |
| will be trapped. Raising any other kind of error will fail the test. |
| If C{None} is passed, then all errors will be trapped. |
| """ |
| self.reason = reason |
| self.errors = errors |
| |
| def __repr__(self): |
| return "<Todo reason=%r errors=%r>" % (self.reason, self.errors) |
| |
| def expected(self, failure): |
| """ |
| @param failure: A L{twisted.python.failure.Failure}. |
| |
| @return: C{True} if C{failure} is expected, C{False} otherwise. |
| """ |
| if self.errors is None: |
| return True |
| for error in self.errors: |
| if failure.check(error): |
| return True |
| return False |
| |
| |
| def makeTodo(value): |
| """ |
| Return a L{Todo} object built from C{value}. |
| |
| If C{value} is a string, return a Todo that expects any exception with |
| C{value} as a reason. If C{value} is a tuple, the second element is used |
| as the reason and the first element as the excepted error(s). |
| |
| @param value: A string or a tuple of C{(errors, reason)}, where C{errors} |
| is either a single exception class or an iterable of exception classes. |
| |
| @return: A L{Todo} object. |
| """ |
| if isinstance(value, str): |
| return Todo(reason=value) |
| if isinstance(value, tuple): |
| errors, reason = value |
| try: |
| errors = list(errors) |
| except TypeError: |
| errors = [errors] |
| return Todo(reason=reason, errors=errors) |
| |
| |
| |
| class _Warning(object): |
| """ |
| A L{_Warning} instance represents one warning emitted through the Python |
| warning system (L{warnings}). This is used to insulate callers of |
| L{_collectWarnings} from changes to the Python warnings system which might |
| otherwise require changes to the warning objects that function passes to |
| the observer object it accepts. |
| |
| @ivar message: The string which was passed as the message parameter to |
| L{warnings.warn}. |
| |
| @ivar category: The L{Warning} subclass which was passed as the category |
| parameter to L{warnings.warn}. |
| |
| @ivar filename: The name of the file containing the definition of the code |
| object which was C{stacklevel} frames above the call to |
| L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel} |
| parameter passed to L{warnings.warn}. |
| |
| @ivar lineno: The source line associated with the active instruction of the |
| code object object which was C{stacklevel} frames above the call to |
| L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel} |
| parameter passed to L{warnings.warn}. |
| """ |
| def __init__(self, message, category, filename, lineno): |
| self.message = message |
| self.category = category |
| self.filename = filename |
| self.lineno = lineno |
| |
| |
| |
| def _collectWarnings(observeWarning, f, *args, **kwargs): |
| """ |
| Call C{f} with C{args} positional arguments and C{kwargs} keyword arguments |
| and collect all warnings which are emitted as a result in a list. |
| |
| @param observeWarning: A callable which will be invoked with a L{_Warning} |
| instance each time a warning is emitted. |
| |
| @return: The return value of C{f(*args, **kwargs)}. |
| """ |
| def showWarning(message, category, filename, lineno, file=None, line=None): |
| assert isinstance(message, Warning) |
| observeWarning(_Warning( |
| message.args[0], category, filename, lineno)) |
| |
| # Disable the per-module cache for every module otherwise if the warning |
| # which the caller is expecting us to collect was already emitted it won't |
| # be re-emitted by the call to f which happens below. |
| for v in sys.modules.itervalues(): |
| if v is not None: |
| try: |
| v.__warningregistry__ = None |
| except: |
| # Don't specify a particular exception type to handle in case |
| # some wacky object raises some wacky exception in response to |
| # the setattr attempt. |
| pass |
| |
| origFilters = warnings.filters[:] |
| origShow = warnings.showwarning |
| warnings.simplefilter('always') |
| try: |
| warnings.showwarning = showWarning |
| result = f(*args, **kwargs) |
| finally: |
| warnings.filters[:] = origFilters |
| warnings.showwarning = origShow |
| return result |
| |
| |
| |
| class _Assertions(pyunit.TestCase, object): |
| """ |
| Replaces many of the built-in TestCase assertions. In general, these |
| assertions provide better error messages and are easier to use in |
| callbacks. Also provides new assertions such as L{failUnlessFailure}. |
| |
| Although the tests are defined as 'failIf*' and 'failUnless*', they can |
| also be called as 'assertNot*' and 'assert*'. |
| """ |
| |
| def fail(self, msg=None): |
| """ |
| Absolutely fail the test. Do not pass go, do not collect $200. |
| |
| @param msg: the message that will be displayed as the reason for the |
| failure |
| """ |
| raise self.failureException(msg) |
| |
| def failIf(self, condition, msg=None): |
| """ |
| Fail the test if C{condition} evaluates to True. |
| |
| @param condition: any object that defines __nonzero__ |
| """ |
| if condition: |
| raise self.failureException(msg) |
| return condition |
| assertNot = assertFalse = failUnlessFalse = failIf |
| |
| def failUnless(self, condition, msg=None): |
| """ |
| Fail the test if C{condition} evaluates to False. |
| |
| @param condition: any object that defines __nonzero__ |
| """ |
| if not condition: |
| raise self.failureException(msg) |
| return condition |
| assert_ = assertTrue = failUnlessTrue = failUnless |
| |
| def failUnlessRaises(self, exception, f, *args, **kwargs): |
| """ |
| Fail the test unless calling the function C{f} with the given |
| C{args} and C{kwargs} raises C{exception}. The failure will report |
| the traceback and call stack of the unexpected exception. |
| |
| @param exception: exception type that is to be expected |
| @param f: the function to call |
| |
| @return: The raised exception instance, if it is of the given type. |
| @raise self.failureException: Raised if the function call does |
| not raise an exception or if it raises an exception of a |
| different type. |
| """ |
| try: |
| result = f(*args, **kwargs) |
| except exception, inst: |
| return inst |
| except: |
| raise self.failureException('%s raised instead of %s:\n %s' |
| % (sys.exc_info()[0], |
| exception.__name__, |
| failure.Failure().getTraceback())) |
| else: |
| raise self.failureException('%s not raised (%r returned)' |
| % (exception.__name__, result)) |
| assertRaises = failUnlessRaises |
| |
| def failUnlessEqual(self, first, second, msg=''): |
| """ |
| Fail the test if C{first} and C{second} are not equal. |
| |
| @param msg: A string describing the failure that's included in the |
| exception. |
| """ |
| if not first == second: |
| if msg is None: |
| msg = '' |
| if len(msg) > 0: |
| msg += '\n' |
| raise self.failureException( |
| '%snot equal:\na = %s\nb = %s\n' |
| % (msg, pformat(first), pformat(second))) |
| return first |
| assertEqual = assertEquals = failUnlessEquals = failUnlessEqual |
| |
| def failUnlessIdentical(self, first, second, msg=None): |
| """ |
| Fail the test if C{first} is not C{second}. This is an |
| obect-identity-equality test, not an object equality |
| (i.e. C{__eq__}) test. |
| |
| @param msg: if msg is None, then the failure message will be |
| '%r is not %r' % (first, second) |
| """ |
| if first is not second: |
| raise self.failureException(msg or '%r is not %r' % (first, second)) |
| return first |
| assertIdentical = failUnlessIdentical |
| |
| def failIfIdentical(self, first, second, msg=None): |
| """ |
| Fail the test if C{first} is C{second}. This is an |
| obect-identity-equality test, not an object equality |
| (i.e. C{__eq__}) test. |
| |
| @param msg: if msg is None, then the failure message will be |
| '%r is %r' % (first, second) |
| """ |
| if first is second: |
| raise self.failureException(msg or '%r is %r' % (first, second)) |
| return first |
| assertNotIdentical = failIfIdentical |
| |
| def failIfEqual(self, first, second, msg=None): |
| """ |
| Fail the test if C{first} == C{second}. |
| |
| @param msg: if msg is None, then the failure message will be |
| '%r == %r' % (first, second) |
| """ |
| if not first != second: |
| raise self.failureException(msg or '%r == %r' % (first, second)) |
| return first |
| assertNotEqual = assertNotEquals = failIfEquals = failIfEqual |
| |
| def failUnlessIn(self, containee, container, msg=None): |
| """ |
| Fail the test if C{containee} is not found in C{container}. |
| |
| @param containee: the value that should be in C{container} |
| @param container: a sequence type, or in the case of a mapping type, |
| will follow semantics of 'if key in dict.keys()' |
| @param msg: if msg is None, then the failure message will be |
| '%r not in %r' % (first, second) |
| """ |
| if containee not in container: |
| raise self.failureException(msg or "%r not in %r" |
| % (containee, container)) |
| return containee |
| assertIn = failUnlessIn |
| |
| def failIfIn(self, containee, container, msg=None): |
| """ |
| Fail the test if C{containee} is found in C{container}. |
| |
| @param containee: the value that should not be in C{container} |
| @param container: a sequence type, or in the case of a mapping type, |
| will follow semantics of 'if key in dict.keys()' |
| @param msg: if msg is None, then the failure message will be |
| '%r in %r' % (first, second) |
| """ |
| if containee in container: |
| raise self.failureException(msg or "%r in %r" |
| % (containee, container)) |
| return containee |
| assertNotIn = failIfIn |
| |
| def failIfAlmostEqual(self, first, second, places=7, msg=None): |
| """ |
| Fail if the two objects are equal as determined by their |
| difference rounded to the given number of decimal places |
| (default 7) and comparing to zero. |
| |
| @note: decimal places (from zero) is usually not the same |
| as significant digits (measured from the most |
| signficant digit). |
| |
| @note: included for compatiblity with PyUnit test cases |
| """ |
| if round(second-first, places) == 0: |
| raise self.failureException(msg or '%r == %r within %r places' |
| % (first, second, places)) |
| return first |
| assertNotAlmostEqual = assertNotAlmostEquals = failIfAlmostEqual |
| failIfAlmostEquals = failIfAlmostEqual |
| |
| def failUnlessAlmostEqual(self, first, second, places=7, msg=None): |
| """ |
| Fail if the two objects are unequal as determined by their |
| difference rounded to the given number of decimal places |
| (default 7) and comparing to zero. |
| |
| @note: decimal places (from zero) is usually not the same |
| as significant digits (measured from the most |
| signficant digit). |
| |
| @note: included for compatiblity with PyUnit test cases |
| """ |
| if round(second-first, places) != 0: |
| raise self.failureException(msg or '%r != %r within %r places' |
| % (first, second, places)) |
| return first |
| assertAlmostEqual = assertAlmostEquals = failUnlessAlmostEqual |
| failUnlessAlmostEquals = failUnlessAlmostEqual |
| |
| def failUnlessApproximates(self, first, second, tolerance, msg=None): |
| """ |
| Fail if C{first} - C{second} > C{tolerance} |
| |
| @param msg: if msg is None, then the failure message will be |
| '%r ~== %r' % (first, second) |
| """ |
| if abs(first - second) > tolerance: |
| raise self.failureException(msg or "%s ~== %s" % (first, second)) |
| return first |
| assertApproximates = failUnlessApproximates |
| |
| def failUnlessFailure(self, deferred, *expectedFailures): |
| """ |
| Fail if C{deferred} does not errback with one of C{expectedFailures}. |
| Returns the original Deferred with callbacks added. You will need |
| to return this Deferred from your test case. |
| """ |
| def _cb(ignore): |
| raise self.failureException( |
| "did not catch an error, instead got %r" % (ignore,)) |
| |
| def _eb(failure): |
| if failure.check(*expectedFailures): |
| return failure.value |
| else: |
| output = ('\nExpected: %r\nGot:\n%s' |
| % (expectedFailures, str(failure))) |
| raise self.failureException(output) |
| return deferred.addCallbacks(_cb, _eb) |
| assertFailure = failUnlessFailure |
| |
| def failUnlessSubstring(self, substring, astring, msg=None): |
| """ |
| Fail if C{substring} does not exist within C{astring}. |
| """ |
| return self.failUnlessIn(substring, astring, msg) |
| assertSubstring = failUnlessSubstring |
| |
| def failIfSubstring(self, substring, astring, msg=None): |
| """ |
| Fail if C{astring} contains C{substring}. |
| """ |
| return self.failIfIn(substring, astring, msg) |
| assertNotSubstring = failIfSubstring |
| |
| def failUnlessWarns(self, category, message, filename, f, |
| *args, **kwargs): |
| """ |
| Fail if the given function doesn't generate the specified warning when |
| called. It calls the function, checks the warning, and forwards the |
| result of the function if everything is fine. |
| |
| @param category: the category of the warning to check. |
| @param message: the output message of the warning to check. |
| @param filename: the filename where the warning should come from. |
| @param f: the function which is supposed to generate the warning. |
| @type f: any callable. |
| @param args: the arguments to C{f}. |
| @param kwargs: the keywords arguments to C{f}. |
| |
| @return: the result of the original function C{f}. |
| """ |
| warningsShown = [] |
| result = _collectWarnings(warningsShown.append, f, *args, **kwargs) |
| |
| if not warningsShown: |
| self.fail("No warnings emitted") |
| first = warningsShown[0] |
| for other in warningsShown[1:]: |
| if ((other.message, other.category) |
| != (first.message, first.category)): |
| self.fail("Can't handle different warnings") |
| self.assertEqual(first.message, message) |
| self.assertIdentical(first.category, category) |
| |
| # Use starts with because of .pyc/.pyo issues. |
| self.failUnless( |
| filename.startswith(first.filename), |
| 'Warning in %r, expected %r' % (first.filename, filename)) |
| |
| # It would be nice to be able to check the line number as well, but |
| # different configurations actually end up reporting different line |
| # numbers (generally the variation is only 1 line, but that's enough |
| # to fail the test erroneously...). |
| # self.assertEqual(lineno, xxx) |
| |
| return result |
| assertWarns = failUnlessWarns |
| |
| def failUnlessIsInstance(self, instance, classOrTuple): |
| """ |
| Fail if C{instance} is not an instance of the given class or of |
| one of the given classes. |
| |
| @param instance: the object to test the type (first argument of the |
| C{isinstance} call). |
| @type instance: any. |
| @param classOrTuple: the class or classes to test against (second |
| argument of the C{isinstance} call). |
| @type classOrTuple: class, type, or tuple. |
| """ |
| if not isinstance(instance, classOrTuple): |
| self.fail("%r is not an instance of %s" % (instance, classOrTuple)) |
| assertIsInstance = failUnlessIsInstance |
| |
| def failIfIsInstance(self, instance, classOrTuple): |
| """ |
| Fail if C{instance} is not an instance of the given class or of |
| one of the given classes. |
| |
| @param instance: the object to test the type (first argument of the |
| C{isinstance} call). |
| @type instance: any. |
| @param classOrTuple: the class or classes to test against (second |
| argument of the C{isinstance} call). |
| @type classOrTuple: class, type, or tuple. |
| """ |
| if isinstance(instance, classOrTuple): |
| self.fail("%r is an instance of %s" % (instance, classOrTuple)) |
| assertNotIsInstance = failIfIsInstance |
| |
| |
| class _LogObserver(object): |
| """ |
| Observes the Twisted logs and catches any errors. |
| |
| @ivar _errors: A C{list} of L{Failure} instances which were received as |
| error events from the Twisted logging system. |
| |
| @ivar _added: A C{int} giving the number of times C{_add} has been called |
| less the number of times C{_remove} has been called; used to only add |
| this observer to the Twisted logging since once, regardless of the |
| number of calls to the add method. |
| |
| @ivar _ignored: A C{list} of exception types which will not be recorded. |
| """ |
| |
| def __init__(self): |
| self._errors = [] |
| self._added = 0 |
| self._ignored = [] |
| |
| |
| def _add(self): |
| if self._added == 0: |
| log.addObserver(self.gotEvent) |
| self._oldFE, log._flushErrors = (log._flushErrors, self.flushErrors) |
| self._oldIE, log._ignore = (log._ignore, self._ignoreErrors) |
| self._oldCI, log._clearIgnores = (log._clearIgnores, |
| self._clearIgnores) |
| self._added += 1 |
| |
| def _remove(self): |
| self._added -= 1 |
| if self._added == 0: |
| log.removeObserver(self.gotEvent) |
| log._flushErrors = self._oldFE |
| log._ignore = self._oldIE |
| log._clearIgnores = self._oldCI |
| |
| |
| def _ignoreErrors(self, *errorTypes): |
| """ |
| Do not store any errors with any of the given types. |
| """ |
| self._ignored.extend(errorTypes) |
| |
| |
| def _clearIgnores(self): |
| """ |
| Stop ignoring any errors we might currently be ignoring. |
| """ |
| self._ignored = [] |
| |
| |
| def flushErrors(self, *errorTypes): |
| """ |
| Flush errors from the list of caught errors. If no arguments are |
| specified, remove all errors. If arguments are specified, only remove |
| errors of those types from the stored list. |
| """ |
| if errorTypes: |
| flushed = [] |
| remainder = [] |
| for f in self._errors: |
| if f.check(*errorTypes): |
| flushed.append(f) |
| else: |
| remainder.append(f) |
| self._errors = remainder |
| else: |
| flushed = self._errors |
| self._errors = [] |
| return flushed |
| |
| |
| def getErrors(self): |
| """ |
| Return a list of errors caught by this observer. |
| """ |
| return self._errors |
| |
| |
| def gotEvent(self, event): |
| """ |
| The actual observer method. Called whenever a message is logged. |
| |
| @param event: A dictionary containing the log message. Actual |
| structure undocumented (see source for L{twisted.python.log}). |
| """ |
| if event.get('isError', False) and 'failure' in event: |
| f = event['failure'] |
| if len(self._ignored) == 0 or not f.check(*self._ignored): |
| self._errors.append(f) |
| |
| |
| |
| _logObserver = _LogObserver() |
| |
| _wait_is_running = [] |
| |
| class TestCase(_Assertions): |
| """ |
| A unit test. The atom of the unit testing universe. |
| |
| This class extends C{unittest.TestCase} from the standard library. The |
| main feature is the ability to return C{Deferred}s from tests and fixture |
| methods and to have the suite wait for those C{Deferred}s to fire. |
| |
| To write a unit test, subclass C{TestCase} and define a method (say, |
| 'test_foo') on the subclass. To run the test, instantiate your subclass |
| with the name of the method, and call L{run} on the instance, passing a |
| L{TestResult} object. |
| |
| The C{trial} script will automatically find any C{TestCase} subclasses |
| defined in modules beginning with 'test_' and construct test cases for all |
| methods beginning with 'test'. |
| |
| If an error is logged during the test run, the test will fail with an |
| error. See L{log.err}. |
| |
| @ivar failureException: An exception class, defaulting to C{FailTest}. If |
| the test method raises this exception, it will be reported as a failure, |
| rather than an exception. All of the assertion methods raise this if the |
| assertion fails. |
| |
| @ivar skip: C{None} or a string explaining why this test is to be |
| skipped. If defined, the test will not be run. Instead, it will be |
| reported to the result object as 'skipped' (if the C{TestResult} supports |
| skipping). |
| |
| @ivar suppress: C{None} or a list of tuples of C{(args, kwargs)} to be |
| passed to C{warnings.filterwarnings}. Use these to suppress warnings |
| raised in a test. Useful for testing deprecated code. See also |
| L{util.suppress}. |
| |
| @ivar timeout: A real number of seconds. If set, the test will |
| raise an error if it takes longer than C{timeout} seconds. |
| If not set, util.DEFAULT_TIMEOUT_DURATION is used. |
| |
| @ivar todo: C{None}, a string or a tuple of C{(errors, reason)} where |
| C{errors} is either an exception class or an iterable of exception |
| classes, and C{reason} is a string. See L{Todo} or L{makeTodo} for more |
| information. |
| """ |
| |
| implements(itrial.ITestCase) |
| failureException = FailTest |
| |
| def __init__(self, methodName='runTest'): |
| """ |
| Construct an asynchronous test case for C{methodName}. |
| |
| @param methodName: The name of a method on C{self}. This method should |
| be a unit test. That is, it should be a short method that calls some of |
| the assert* methods. If C{methodName} is unspecified, L{runTest} will |
| be used as the test method. This is mostly useful for testing Trial. |
| """ |
| super(TestCase, self).__init__(methodName) |
| self._testMethodName = methodName |
| testMethod = getattr(self, methodName) |
| self._parents = [testMethod, self] |
| self._parents.extend(util.getPythonContainers(testMethod)) |
| self._passed = False |
| self._cleanups = [] |
| |
| if sys.version_info >= (2, 6): |
| # Override the comparison defined by the base TestCase which considers |
| # instances of the same class with the same _testMethodName to be |
| # equal. Since trial puts TestCase instances into a set, that |
| # definition of comparison makes it impossible to run the same test |
| # method twice. Most likely, trial should stop using a set to hold |
| # tests, but until it does, this is necessary on Python 2.6. Only |
| # __eq__ and __ne__ are required here, not __hash__, since the |
| # inherited __hash__ is compatible with these equality semantics. A |
| # different __hash__ might be slightly more efficient (by reducing |
| # collisions), but who cares? -exarkun |
| def __eq__(self, other): |
| return self is other |
| |
| def __ne__(self, other): |
| return self is not other |
| |
| |
| def _run(self, methodName, result): |
| from twisted.internet import reactor |
| timeout = self.getTimeout() |
| def onTimeout(d): |
| e = defer.TimeoutError("%r (%s) still running at %s secs" |
| % (self, methodName, timeout)) |
| f = failure.Failure(e) |
| # try to errback the deferred that the test returns (for no gorram |
| # reason) (see issue1005 and test_errorPropagation in |
| # test_deferred) |
| try: |
| d.errback(f) |
| except defer.AlreadyCalledError: |
| # if the deferred has been called already but the *back chain |
| # is still unfinished, crash the reactor and report timeout |
| # error ourself. |
| reactor.crash() |
| self._timedOut = True # see self._wait |
| todo = self.getTodo() |
| if todo is not None and todo.expected(f): |
| result.addExpectedFailure(self, f, todo) |
| else: |
| result.addError(self, f) |
| onTimeout = utils.suppressWarnings( |
| onTimeout, util.suppress(category=DeprecationWarning)) |
| method = getattr(self, methodName) |
| d = defer.maybeDeferred(utils.runWithWarningsSuppressed, |
| self.getSuppress(), method) |
| call = reactor.callLater(timeout, onTimeout, d) |
| d.addBoth(lambda x : call.active() and call.cancel() or x) |
| return d |
| |
| def shortDescription(self): |
| desc = super(TestCase, self).shortDescription() |
| if desc is None: |
| return self._testMethodName |
| return desc |
| |
| def __call__(self, *args, **kwargs): |
| return self.run(*args, **kwargs) |
| |
| def deferSetUp(self, ignored, result): |
| d = self._run('setUp', result) |
| d.addCallbacks(self.deferTestMethod, self._ebDeferSetUp, |
| callbackArgs=(result,), |
| errbackArgs=(result,)) |
| return d |
| |
| def _ebDeferSetUp(self, failure, result): |
| if failure.check(SkipTest): |
| result.addSkip(self, self._getReason(failure)) |
| else: |
| result.addError(self, failure) |
| if failure.check(KeyboardInterrupt): |
| result.stop() |
| return self.deferRunCleanups(None, result) |
| |
| def deferTestMethod(self, ignored, result): |
| d = self._run(self._testMethodName, result) |
| d.addCallbacks(self._cbDeferTestMethod, self._ebDeferTestMethod, |
| callbackArgs=(result,), |
| errbackArgs=(result,)) |
| d.addBoth(self.deferRunCleanups, result) |
| d.addBoth(self.deferTearDown, result) |
| return d |
| |
| def _cbDeferTestMethod(self, ignored, result): |
| if self.getTodo() is not None: |
| result.addUnexpectedSuccess(self, self.getTodo()) |
| else: |
| self._passed = True |
| return ignored |
| |
| def _ebDeferTestMethod(self, f, result): |
| todo = self.getTodo() |
| if todo is not None and todo.expected(f): |
| result.addExpectedFailure(self, f, todo) |
| elif f.check(self.failureException, FailTest): |
| result.addFailure(self, f) |
| elif f.check(KeyboardInterrupt): |
| result.addError(self, f) |
| result.stop() |
| elif f.check(SkipTest): |
| result.addSkip(self, self._getReason(f)) |
| else: |
| result.addError(self, f) |
| |
| def deferTearDown(self, ignored, result): |
| d = self._run('tearDown', result) |
| d.addErrback(self._ebDeferTearDown, result) |
| return d |
| |
| def _ebDeferTearDown(self, failure, result): |
| result.addError(self, failure) |
| if failure.check(KeyboardInterrupt): |
| result.stop() |
| self._passed = False |
| |
| def deferRunCleanups(self, ignored, result): |
| """ |
| Run any scheduled cleanups and report errors (if any to the result |
| object. |
| """ |
| d = self._runCleanups() |
| d.addCallback(self._cbDeferRunCleanups, result) |
| return d |
| |
| def _cbDeferRunCleanups(self, cleanupResults, result): |
| for flag, failure in cleanupResults: |
| if flag == defer.FAILURE: |
| result.addError(self, failure) |
| if failure.check(KeyboardInterrupt): |
| result.stop() |
| self._passed = False |
| |
| def _cleanUp(self, result): |
| try: |
| clean = util._Janitor(self, result).postCaseCleanup() |
| if not clean: |
| self._passed = False |
| except: |
| result.addError(self, failure.Failure()) |
| self._passed = False |
| for error in self._observer.getErrors(): |
| result.addError(self, error) |
| self._passed = False |
| self.flushLoggedErrors() |
| self._removeObserver() |
| if self._passed: |
| result.addSuccess(self) |
| |
| def _classCleanUp(self, result): |
| try: |
| util._Janitor(self, result).postClassCleanup() |
| except: |
| result.addError(self, failure.Failure()) |
| |
| def _makeReactorMethod(self, name): |
| """ |
| Create a method which wraps the reactor method C{name}. The new |
| method issues a deprecation warning and calls the original. |
| """ |
| def _(*a, **kw): |
| warnings.warn("reactor.%s cannot be used inside unit tests. " |
| "In the future, using %s will fail the test and may " |
| "crash or hang the test run." |
| % (name, name), |
| stacklevel=2, category=DeprecationWarning) |
| return self._reactorMethods[name](*a, **kw) |
| return _ |
| |
| def _deprecateReactor(self, reactor): |
| """ |
| Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is, |
| each method is wrapped in a function that issues a deprecation |
| warning, then calls the original. |
| |
| @param reactor: The Twisted reactor. |
| """ |
| self._reactorMethods = {} |
| for name in ['crash', 'iterate', 'stop']: |
| self._reactorMethods[name] = getattr(reactor, name) |
| setattr(reactor, name, self._makeReactorMethod(name)) |
| |
| def _undeprecateReactor(self, reactor): |
| """ |
| Restore the deprecated reactor methods. Undoes what |
| L{_deprecateReactor} did. |
| |
| @param reactor: The Twisted reactor. |
| """ |
| for name, method in self._reactorMethods.iteritems(): |
| setattr(reactor, name, method) |
| self._reactorMethods = {} |
| |
| def _installObserver(self): |
| self._observer = _logObserver |
| self._observer._add() |
| |
| def _removeObserver(self): |
| self._observer._remove() |
| |
| def flushLoggedErrors(self, *errorTypes): |
| """ |
| Remove stored errors received from the log. |
| |
| C{TestCase} stores each error logged during the run of the test and |
| reports them as errors during the cleanup phase (after C{tearDown}). |
| |
| @param *errorTypes: If unspecifed, flush all errors. Otherwise, only |
| flush errors that match the given types. |
| |
| @return: A list of failures that have been removed. |
| """ |
| return self._observer.flushErrors(*errorTypes) |
| |
| |
| def flushWarnings(self, offendingFunctions=None): |
| """ |
| Remove stored warnings from the list of captured warnings and return |
| them. |
| |
| @param offendingFunctions: If C{None}, all warnings issued during the |
| currently running test will be flushed. Otherwise, only warnings |
| which I{point} to a function included in this list will be flushed. |
| All warnings include a filename and source line number; if these |
| parts of a warning point to a source line which is part of a |
| function, then the warning I{points} to that function. |
| @type offendingFunctions: L{NoneType} or L{list} of functions or methods. |
| |
| @raise ValueError: If C{offendingFunctions} is not C{None} and includes |
| an object which is not a L{FunctionType} or L{MethodType} instance. |
| |
| @return: A C{list}, each element of which is a C{dict} giving |
| information about one warning which was flushed by this call. The |
| keys of each C{dict} are: |
| |
| - C{'message'}: The string which was passed as the I{message} |
| parameter to L{warnings.warn}. |
| |
| - C{'category'}: The warning subclass which was passed as the |
| I{category} parameter to L{warnings.warn}. |
| |
| - C{'filename'}: The name of the file containing the definition |
| of the code object which was C{stacklevel} frames above the |
| call to L{warnings.warn}, where C{stacklevel} is the value of |
| the C{stacklevel} parameter passed to L{warnings.warn}. |
| |
| - C{'lineno'}: The source line associated with the active |
| instruction of the code object object which was C{stacklevel} |
| frames above the call to L{warnings.warn}, where |
| C{stacklevel} is the value of the C{stacklevel} parameter |
| passed to L{warnings.warn}. |
| """ |
| if offendingFunctions is None: |
| toFlush = self._warnings[:] |
| self._warnings[:] = [] |
| else: |
| toFlush = [] |
| for aWarning in self._warnings: |
| for aFunction in offendingFunctions: |
| if not isinstance(aFunction, ( |
| types.FunctionType, types.MethodType)): |
| raise ValueError("%r is not a function or method" % ( |
| aFunction,)) |
| |
| # inspect.getabsfile(aFunction) sometimes returns a |
| # filename which disagrees with the filename the warning |
| # system generates. This seems to be because a |
| # function's code object doesn't deal with source files |
| # being renamed. inspect.getabsfile(module) seems |
| # better (or at least agrees with the warning system |
| # more often), and does some normalization for us which |
| # is desirable. inspect.getmodule() is attractive, but |
| # somewhat broken in Python 2.3. See Python bug 4845. |
| aModule = sys.modules[aFunction.__module__] |
| filename = inspect.getabsfile(aModule) |
| |
| if filename != os.path.normcase(aWarning.filename): |
| continue |
| lineStarts = list(_findlinestarts(aFunction.func_code)) |
| first = lineStarts[0][1] |
| last = lineStarts[-1][1] |
| if not (first <= aWarning.lineno <= last): |
| continue |
| # The warning points to this function, flush it and move on |
| # to the next warning. |
| toFlush.append(aWarning) |
| break |
| # Remove everything which is being flushed. |
| map(self._warnings.remove, toFlush) |
| |
| return [ |
| {'message': w.message, 'category': w.category, |
| 'filename': w.filename, 'lineno': w.lineno} |
| for w in toFlush] |
| |
| |
| def addCleanup(self, f, *args, **kwargs): |
| """ |
| Add the given function to a list of functions to be called after the |
| test has run, but before C{tearDown}. |
| |
| Functions will be run in reverse order of being added. This helps |
| ensure that tear down complements set up. |
| |
| The function C{f} may return a Deferred. If so, C{TestCase} will wait |
| until the Deferred has fired before proceeding to the next function. |
| """ |
| self._cleanups.append((f, args, kwargs)) |
| |
| |
| def callDeprecated(self, version, f, *args, **kwargs): |
| """ |
| Call a function that was deprecated at a specific version. |
| |
| @param version: The version that the function was deprecated in. |
| @param f: The deprecated function to call. |
| @return: Whatever the function returns. |
| """ |
| result = f(*args, **kwargs) |
| warningsShown = self.flushWarnings([self.callDeprecated]) |
| |
| if len(warningsShown) == 0: |
| self.fail('%r is not deprecated.' % (f,)) |
| |
| observedWarning = warningsShown[0]['message'] |
| expectedWarning = getDeprecationWarningString(f, version) |
| self.assertEqual(expectedWarning, observedWarning) |
| |
| return result |
| |
| |
| def _runCleanups(self): |
| """ |
| Run the cleanups added with L{addCleanup} in order. |
| |
| @return: A C{Deferred} that fires when all cleanups are run. |
| """ |
| def _makeFunction(f, args, kwargs): |
| return lambda: f(*args, **kwargs) |
| callables = [] |
| while len(self._cleanups) > 0: |
| f, args, kwargs = self._cleanups.pop() |
| callables.append(_makeFunction(f, args, kwargs)) |
| return util._runSequentially(callables) |
| |
| |
| def patch(self, obj, attribute, value): |
| """ |
| Monkey patch an object for the duration of the test. |
| |
| The monkey patch will be reverted at the end of the test using the |
| L{addCleanup} mechanism. |
| |
| The L{MonkeyPatcher} is returned so that users can restore and |
| re-apply the monkey patch within their tests. |
| |
| @param obj: The object to monkey patch. |
| @param attribute: The name of the attribute to change. |
| @param value: The value to set the attribute to. |
| @return: A L{monkey.MonkeyPatcher} object. |
| """ |
| monkeyPatch = monkey.MonkeyPatcher((obj, attribute, value)) |
| monkeyPatch.patch() |
| self.addCleanup(monkeyPatch.restore) |
| return monkeyPatch |
| |
| |
| def runTest(self): |
| """ |
| If no C{methodName} argument is passed to the constructor, L{run} will |
| treat this method as the thing with the actual test inside. |
| """ |
| |
| |
| def run(self, result): |
| """ |
| Run the test case, storing the results in C{result}. |
| |
| First runs C{setUp} on self, then runs the test method (defined in the |
| constructor), then runs C{tearDown}. Any of these may return |
| L{Deferred}s. After they complete, does some reactor cleanup. |
| |
| @param result: A L{TestResult} object. |
| """ |
| log.msg("--> %s <--" % (self.id())) |
| from twisted.internet import reactor |
| new_result = itrial.IReporter(result, None) |
| if new_result is None: |
| result = PyUnitResultAdapter(result) |
| else: |
| result = new_result |
| self._timedOut = False |
| result.startTest(self) |
| if self.getSkip(): # don't run test methods that are marked as .skip |
| result.addSkip(self, self.getSkip()) |
| result.stopTest(self) |
| return |
| self._installObserver() |
| |
| # All the code inside runThunk will be run such that warnings emitted |
| # by it will be collected and retrievable by flushWarnings. |
| def runThunk(): |
| self._passed = False |
| self._deprecateReactor(reactor) |
| try: |
| d = self.deferSetUp(None, result) |
| try: |
| self._wait(d) |
| finally: |
| self._cleanUp(result) |
| self._classCleanUp(result) |
| finally: |
| self._undeprecateReactor(reactor) |
| |
| self._warnings = [] |
| _collectWarnings(self._warnings.append, runThunk) |
| |
| # Any collected warnings which the test method didn't flush get |
| # re-emitted so they'll be logged or show up on stdout or whatever. |
| for w in self.flushWarnings(): |
| try: |
| warnings.warn_explicit(**w) |
| except: |
| result.addError(self, failure.Failure()) |
| |
| result.stopTest(self) |
| |
| |
| def _getReason(self, f): |
| if len(f.value.args) > 0: |
| reason = f.value.args[0] |
| else: |
| warnings.warn(("Do not raise unittest.SkipTest with no " |
| "arguments! Give a reason for skipping tests!"), |
| stacklevel=2) |
| reason = f |
| return reason |
| |
| def getSkip(self): |
| """ |
| Return the skip reason set on this test, if any is set. Checks on the |
| instance first, then the class, then the module, then packages. As |
| soon as it finds something with a C{skip} attribute, returns that. |
| Returns C{None} if it cannot find anything. See L{TestCase} docstring |
| for more details. |
| """ |
| return util.acquireAttribute(self._parents, 'skip', None) |
| |
| def getTodo(self): |
| """ |
| Return a L{Todo} object if the test is marked todo. Checks on the |
| instance first, then the class, then the module, then packages. As |
| soon as it finds something with a C{todo} attribute, returns that. |
| Returns C{None} if it cannot find anything. See L{TestCase} docstring |
| for more details. |
| """ |
| todo = util.acquireAttribute(self._parents, 'todo', None) |
| if todo is None: |
| return None |
| return makeTodo(todo) |
| |
| def getTimeout(self): |
| """ |
| Returns the timeout value set on this test. Checks on the instance |
| first, then the class, then the module, then packages. As soon as it |
| finds something with a C{timeout} attribute, returns that. Returns |
| L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See |
| L{TestCase} docstring for more details. |
| """ |
| timeout = util.acquireAttribute(self._parents, 'timeout', |
| util.DEFAULT_TIMEOUT_DURATION) |
| try: |
| return float(timeout) |
| except (ValueError, TypeError): |
| # XXX -- this is here because sometimes people will have methods |
| # called 'timeout', or set timeout to 'orange', or something |
| # Particularly, test_news.NewsTestCase and ReactorCoreTestCase |
| # both do this. |
| warnings.warn("'timeout' attribute needs to be a number.", |
| category=DeprecationWarning) |
| return util.DEFAULT_TIMEOUT_DURATION |
| |
| def getSuppress(self): |
| """ |
| Returns any warning suppressions set for this test. Checks on the |
| instance first, then the class, then the module, then packages. As |
| soon as it finds something with a C{suppress} attribute, returns that. |
| Returns any empty list (i.e. suppress no warnings) if it cannot find |
| anything. See L{TestCase} docstring for more details. |
| """ |
| return util.acquireAttribute(self._parents, 'suppress', []) |
| |
| |
| def visit(self, visitor): |
| """ |
| Visit this test case. Call C{visitor} with C{self} as a parameter. |
| |
| Deprecated in Twisted 8.0. |
| |
| @param visitor: A callable which expects a single parameter: a test |
| case. |
| |
| @return: None |
| """ |
| warnings.warn("Test visitors deprecated in Twisted 8.0", |
| category=DeprecationWarning) |
| visitor(self) |
| |
| |
| def mktemp(self): |
| """Returns a unique name that may be used as either a temporary |
| directory or filename. |
| |
| @note: you must call os.mkdir on the value returned from this |
| method if you wish to use it as a directory! |
| """ |
| MAX_FILENAME = 32 # some platforms limit lengths of filenames |
| base = os.path.join(self.__class__.__module__[:MAX_FILENAME], |
| self.__class__.__name__[:MAX_FILENAME], |
| self._testMethodName[:MAX_FILENAME]) |
| if not os.path.exists(base): |
| os.makedirs(base) |
| dirname = tempfile.mkdtemp('', '', base) |
| return os.path.join(dirname, 'temp') |
| |
| def _wait(self, d, running=_wait_is_running): |
| """Take a Deferred that only ever callbacks. Block until it happens. |
| """ |
| from twisted.internet import reactor |
| if running: |
| raise RuntimeError("_wait is not reentrant") |
| |
| results = [] |
| def append(any): |
| if results is not None: |
| results.append(any) |
| def crash(ign): |
| if results is not None: |
| reactor.crash() |
| crash = utils.suppressWarnings( |
| crash, util.suppress(message=r'reactor\.crash cannot be used.*', |
| category=DeprecationWarning)) |
| def stop(): |
| reactor.crash() |
| stop = utils.suppressWarnings( |
| stop, util.suppress(message=r'reactor\.crash cannot be used.*', |
| category=DeprecationWarning)) |
| |
| running.append(None) |
| try: |
| d.addBoth(append) |
| if results: |
| # d might have already been fired, in which case append is |
| # called synchronously. Avoid any reactor stuff. |
| return |
| d.addBoth(crash) |
| reactor.stop = stop |
| try: |
| reactor.run() |
| finally: |
| del reactor.stop |
| |
| # If the reactor was crashed elsewhere due to a timeout, hopefully |
| # that crasher also reported an error. Just return. |
| # _timedOut is most likely to be set when d has fired but hasn't |
| # completed its callback chain (see self._run) |
| if results or self._timedOut: #defined in run() and _run() |
| return |
| |
| # If the timeout didn't happen, and we didn't get a result or |
| # a failure, then the user probably aborted the test, so let's |
| # just raise KeyboardInterrupt. |
| |
| # FIXME: imagine this: |
| # web/test/test_webclient.py: |
| # exc = self.assertRaises(error.Error, wait, method(url)) |
| # |
| # wait() will raise KeyboardInterrupt, and assertRaises will |
| # swallow it. Therefore, wait() raising KeyboardInterrupt is |
| # insufficient to stop trial. A suggested solution is to have |
| # this code set a "stop trial" flag, or otherwise notify trial |
| # that it should really try to stop as soon as possible. |
| raise KeyboardInterrupt() |
| finally: |
| results = None |
| running.pop() |
| |
| |
| class UnsupportedTrialFeature(Exception): |
| """A feature of twisted.trial was used that pyunit cannot support.""" |
| |
| |
| |
| class PyUnitResultAdapter(object): |
| """ |
| Wrap a C{TestResult} from the standard library's C{unittest} so that it |
| supports the extended result types from Trial, and also supports |
| L{twisted.python.failure.Failure}s being passed to L{addError} and |
| L{addFailure}. |
| """ |
| |
| def __init__(self, original): |
| """ |
| @param original: A C{TestResult} instance from C{unittest}. |
| """ |
| self.original = original |
| |
| def _exc_info(self, err): |
| return util.excInfoOrFailureToExcInfo(err) |
| |
| def startTest(self, method): |
| self.original.startTest(method) |
| |
| def stopTest(self, method): |
| self.original.stopTest(method) |
| |
| def addFailure(self, test, fail): |
| self.original.addFailure(test, self._exc_info(fail)) |
| |
| def addError(self, test, error): |
| self.original.addError(test, self._exc_info(error)) |
| |
| def _unsupported(self, test, feature, info): |
| self.original.addFailure( |
| test, |
| (UnsupportedTrialFeature, |
| UnsupportedTrialFeature(feature, info), |
| None)) |
| |
| def addSkip(self, test, reason): |
| """ |
| Report the skip as a failure. |
| """ |
| self._unsupported(test, 'skip', reason) |
| |
| def addUnexpectedSuccess(self, test, todo): |
| """ |
| Report the unexpected success as a failure. |
| """ |
| self._unsupported(test, 'unexpected success', todo) |
| |
| def addExpectedFailure(self, test, error): |
| """ |
| Report the expected failure (i.e. todo) as a failure. |
| """ |
| self._unsupported(test, 'expected failure', error) |
| |
| def addSuccess(self, test): |
| self.original.addSuccess(test) |
| |
| def upDownError(self, method, error, warn, printStatus): |
| pass |
| |
| |
| |
| def suiteVisit(suite, visitor): |
| """ |
| Visit each test in C{suite} with C{visitor}. |
| |
| Deprecated in Twisted 8.0. |
| |
| @param visitor: A callable which takes a single argument, the L{TestCase} |
| instance to visit. |
| @return: None |
| """ |
| warnings.warn("Test visitors deprecated in Twisted 8.0", |
| category=DeprecationWarning) |
| for case in suite._tests: |
| visit = getattr(case, 'visit', None) |
| if visit is not None: |
| visit(visitor) |
| elif isinstance(case, pyunit.TestCase): |
| case = itrial.ITestCase(case) |
| case.visit(visitor) |
| elif isinstance(case, pyunit.TestSuite): |
| suiteVisit(case, visitor) |
| else: |
| case.visit(visitor) |
| |
| |
| |
| class TestSuite(pyunit.TestSuite): |
| """ |
| Extend the standard library's C{TestSuite} with support for the visitor |
| pattern and a consistently overrideable C{run} method. |
| """ |
| |
| visit = suiteVisit |
| |
| def __call__(self, result): |
| return self.run(result) |
| |
| |
| def run(self, result): |
| """ |
| Call C{run} on every member of the suite. |
| """ |
| # we implement this because Python 2.3 unittest defines this code |
| # in __call__, whereas 2.4 defines the code in run. |
| for test in self._tests: |
| if result.shouldStop: |
| break |
| test(result) |
| return result |
| |
| |
| |
| class TestDecorator(components.proxyForInterface(itrial.ITestCase, |
| "_originalTest")): |
| """ |
| Decorator for test cases. |
| |
| @param _originalTest: The wrapped instance of test. |
| @type _originalTest: A provider of L{itrial.ITestCase} |
| """ |
| |
| implements(itrial.ITestCase) |
| |
| |
| def __call__(self, result): |
| """ |
| Run the unit test. |
| |
| @param result: A TestResult object. |
| """ |
| return self.run(result) |
| |
| |
| def run(self, result): |
| """ |
| Run the unit test. |
| |
| @param result: A TestResult object. |
| """ |
| return self._originalTest.run( |
| reporter._AdaptedReporter(result, self.__class__)) |
| |
| |
| |
| def _clearSuite(suite): |
| """ |
| Clear all tests from C{suite}. |
| |
| This messes with the internals of C{suite}. In particular, it assumes that |
| the suite keeps all of its tests in a list in an instance variable called |
| C{_tests}. |
| """ |
| suite._tests = [] |
| |
| |
| def decorate(test, decorator): |
| """ |
| Decorate all test cases in C{test} with C{decorator}. |
| |
| C{test} can be a test case or a test suite. If it is a test suite, then the |
| structure of the suite is preserved. |
| |
| L{decorate} tries to preserve the class of the test suites it finds, but |
| assumes the presence of the C{_tests} attribute on the suite. |
| |
| @param test: The C{TestCase} or C{TestSuite} to decorate. |
| |
| @param decorator: A unary callable used to decorate C{TestCase}s. |
| |
| @return: A decorated C{TestCase} or a C{TestSuite} containing decorated |
| C{TestCase}s. |
| """ |
| |
| try: |
| tests = iter(test) |
| except TypeError: |
| return decorator(test) |
| |
| # At this point, we know that 'test' is a test suite. |
| _clearSuite(test) |
| |
| for case in tests: |
| test.addTest(decorate(case, decorator)) |
| return test |
| |
| |
| |
| class _PyUnitTestCaseAdapter(TestDecorator): |
| """ |
| Adapt from pyunit.TestCase to ITestCase. |
| """ |
| |
| |
| def visit(self, visitor): |
| """ |
| Deprecated in Twisted 8.0. |
| """ |
| warnings.warn("Test visitors deprecated in Twisted 8.0", |
| category=DeprecationWarning) |
| visitor(self) |
| |
| |
| |
| class _BrokenIDTestCaseAdapter(_PyUnitTestCaseAdapter): |
| """ |
| Adapter for pyunit-style C{TestCase} subclasses that have undesirable id() |
| methods. That is L{pyunit.FunctionTestCase} and L{pyunit.DocTestCase}. |
| """ |
| |
| def id(self): |
| """ |
| Return the fully-qualified Python name of the doctest. |
| """ |
| testID = self._originalTest.shortDescription() |
| if testID is not None: |
| return testID |
| return self._originalTest.id() |
| |
| |
| |
| class _ForceGarbageCollectionDecorator(TestDecorator): |
| """ |
| Forces garbage collection to be run before and after the test. Any errors |
| logged during the post-test collection are added to the test result as |
| errors. |
| """ |
| |
| def run(self, result): |
| gc.collect() |
| TestDecorator.run(self, result) |
| _logObserver._add() |
| gc.collect() |
| for error in _logObserver.getErrors(): |
| result.addError(self, error) |
| _logObserver.flushErrors() |
| _logObserver._remove() |
| |
| |
| components.registerAdapter( |
| _PyUnitTestCaseAdapter, pyunit.TestCase, itrial.ITestCase) |
| |
| |
| components.registerAdapter( |
| _BrokenIDTestCaseAdapter, pyunit.FunctionTestCase, itrial.ITestCase) |
| |
| |
| _docTestCase = getattr(doctest, 'DocTestCase', None) |
| if _docTestCase: |
| components.registerAdapter( |
| _BrokenIDTestCaseAdapter, _docTestCase, itrial.ITestCase) |
| |
| |
| def _iterateTests(testSuiteOrCase): |
| """ |
| Iterate through all of the test cases in C{testSuiteOrCase}. |
| """ |
| try: |
| suite = iter(testSuiteOrCase) |
| except TypeError: |
| yield testSuiteOrCase |
| else: |
| for test in suite: |
| for subtest in _iterateTests(test): |
| yield subtest |
| |
| |
| |
| # Support for Python 2.3 |
| try: |
| iter(pyunit.TestSuite()) |
| except TypeError: |
| # Python 2.3's TestSuite doesn't support iteration. Let's monkey patch it! |
| def __iter__(self): |
| return iter(self._tests) |
| pyunit.TestSuite.__iter__ = __iter__ |
| |
| |
| |
| class _SubTestCase(TestCase): |
| def __init__(self): |
| TestCase.__init__(self, 'run') |
| |
| _inst = _SubTestCase() |
| |
| def _deprecate(name): |
| """ |
| Internal method used to deprecate top-level assertions. Do not use this. |
| """ |
| def _(*args, **kwargs): |
| warnings.warn("unittest.%s is deprecated. Instead use the %r " |
| "method on unittest.TestCase" % (name, name), |
| stacklevel=2, category=DeprecationWarning) |
| return getattr(_inst, name)(*args, **kwargs) |
| return _ |
| |
| |
| _assertions = ['fail', 'failUnlessEqual', 'failIfEqual', 'failIfEquals', |
| 'failUnless', 'failUnlessIdentical', 'failUnlessIn', |
| 'failIfIdentical', 'failIfIn', 'failIf', |
| 'failUnlessAlmostEqual', 'failIfAlmostEqual', |
| 'failUnlessRaises', 'assertApproximates', |
| 'assertFailure', 'failUnlessSubstring', 'failIfSubstring', |
| 'assertAlmostEqual', 'assertAlmostEquals', |
| 'assertNotAlmostEqual', 'assertNotAlmostEquals', 'assertEqual', |
| 'assertEquals', 'assertNotEqual', 'assertNotEquals', |
| 'assertRaises', 'assert_', 'assertIdentical', |
| 'assertNotIdentical', 'assertIn', 'assertNotIn', |
| 'failUnlessFailure', 'assertSubstring', 'assertNotSubstring'] |
| |
| |
| for methodName in _assertions: |
| globals()[methodName] = _deprecate(methodName) |
| |
| |
| __all__ = ['TestCase', 'FailTest', 'SkipTest'] |