| import gc |
| import io |
| import os |
| import sys |
| import signal |
| import weakref |
| import unittest |
| |
| from test import support |
| |
| |
| @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") |
| @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") |
| class TestBreak(unittest.TestCase): |
| int_handler = None |
| # This number was smart-guessed, previously tests were failing |
| # after 7th run. So, we take `x * 2 + 1` to be sure. |
| default_repeats = 15 |
| |
| def setUp(self): |
| self._default_handler = signal.getsignal(signal.SIGINT) |
| if self.int_handler is not None: |
| signal.signal(signal.SIGINT, self.int_handler) |
| |
| def tearDown(self): |
| signal.signal(signal.SIGINT, self._default_handler) |
| unittest.signals._results = weakref.WeakKeyDictionary() |
| unittest.signals._interrupt_handler = None |
| |
| |
| def withRepeats(self, test_function, repeats=None): |
| if not support.check_impl_detail(cpython=True): |
| # Override repeats count on non-cpython to execute only once. |
| # Because this test only makes sense to be repeated on CPython. |
| repeats = 1 |
| elif repeats is None: |
| repeats = self.default_repeats |
| |
| for repeat in range(repeats): |
| with self.subTest(repeat=repeat): |
| # We don't run `setUp` for the very first repeat |
| # and we don't run `tearDown` for the very last one, |
| # because they are handled by the test class itself. |
| if repeat != 0: |
| self.setUp() |
| try: |
| test_function() |
| finally: |
| if repeat != repeats - 1: |
| self.tearDown() |
| |
| def testInstallHandler(self): |
| default_handler = signal.getsignal(signal.SIGINT) |
| unittest.installHandler() |
| self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| try: |
| pid = os.getpid() |
| os.kill(pid, signal.SIGINT) |
| except KeyboardInterrupt: |
| self.fail("KeyboardInterrupt not handled") |
| |
| self.assertTrue(unittest.signals._interrupt_handler.called) |
| |
| def testRegisterResult(self): |
| result = unittest.TestResult() |
| self.assertNotIn(result, unittest.signals._results) |
| |
| unittest.registerResult(result) |
| try: |
| self.assertIn(result, unittest.signals._results) |
| finally: |
| unittest.removeResult(result) |
| |
| def testInterruptCaught(self): |
| def test(result): |
| pid = os.getpid() |
| os.kill(pid, signal.SIGINT) |
| result.breakCaught = True |
| self.assertTrue(result.shouldStop) |
| |
| def test_function(): |
| result = unittest.TestResult() |
| unittest.installHandler() |
| unittest.registerResult(result) |
| |
| self.assertNotEqual( |
| signal.getsignal(signal.SIGINT), |
| self._default_handler, |
| ) |
| |
| try: |
| test(result) |
| except KeyboardInterrupt: |
| self.fail("KeyboardInterrupt not handled") |
| self.assertTrue(result.breakCaught) |
| self.withRepeats(test_function) |
| |
| def testSecondInterrupt(self): |
| # Can't use skipIf decorator because the signal handler may have |
| # been changed after defining this method. |
| if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: |
| self.skipTest("test requires SIGINT to not be ignored") |
| |
| def test(result): |
| pid = os.getpid() |
| os.kill(pid, signal.SIGINT) |
| result.breakCaught = True |
| self.assertTrue(result.shouldStop) |
| os.kill(pid, signal.SIGINT) |
| self.fail("Second KeyboardInterrupt not raised") |
| |
| def test_function(): |
| result = unittest.TestResult() |
| unittest.installHandler() |
| unittest.registerResult(result) |
| |
| with self.assertRaises(KeyboardInterrupt): |
| test(result) |
| self.assertTrue(result.breakCaught) |
| self.withRepeats(test_function) |
| |
| |
| def testTwoResults(self): |
| def test_function(): |
| unittest.installHandler() |
| |
| result = unittest.TestResult() |
| unittest.registerResult(result) |
| new_handler = signal.getsignal(signal.SIGINT) |
| |
| result2 = unittest.TestResult() |
| unittest.registerResult(result2) |
| self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) |
| |
| result3 = unittest.TestResult() |
| |
| try: |
| os.kill(os.getpid(), signal.SIGINT) |
| except KeyboardInterrupt: |
| self.fail("KeyboardInterrupt not handled") |
| |
| self.assertTrue(result.shouldStop) |
| self.assertTrue(result2.shouldStop) |
| self.assertFalse(result3.shouldStop) |
| self.withRepeats(test_function) |
| |
| |
| def testHandlerReplacedButCalled(self): |
| # Can't use skipIf decorator because the signal handler may have |
| # been changed after defining this method. |
| if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: |
| self.skipTest("test requires SIGINT to not be ignored") |
| |
| def test_function(): |
| # If our handler has been replaced (is no longer installed) but is |
| # called by the *new* handler, then it isn't safe to delay the |
| # SIGINT and we should immediately delegate to the default handler |
| unittest.installHandler() |
| |
| handler = signal.getsignal(signal.SIGINT) |
| def new_handler(frame, signum): |
| handler(frame, signum) |
| signal.signal(signal.SIGINT, new_handler) |
| |
| try: |
| os.kill(os.getpid(), signal.SIGINT) |
| except KeyboardInterrupt: |
| pass |
| else: |
| self.fail("replaced but delegated handler doesn't raise interrupt") |
| self.withRepeats(test_function) |
| |
| def testRunner(self): |
| # Creating a TextTestRunner with the appropriate argument should |
| # register the TextTestResult it creates |
| runner = unittest.TextTestRunner(stream=io.StringIO()) |
| |
| result = runner.run(unittest.TestSuite()) |
| self.assertIn(result, unittest.signals._results) |
| |
| def testWeakReferences(self): |
| # Calling registerResult on a result should not keep it alive |
| result = unittest.TestResult() |
| unittest.registerResult(result) |
| |
| ref = weakref.ref(result) |
| del result |
| |
| # For non-reference counting implementations |
| gc.collect();gc.collect() |
| self.assertIsNone(ref()) |
| |
| |
| def testRemoveResult(self): |
| result = unittest.TestResult() |
| unittest.registerResult(result) |
| |
| unittest.installHandler() |
| self.assertTrue(unittest.removeResult(result)) |
| |
| # Should this raise an error instead? |
| self.assertFalse(unittest.removeResult(unittest.TestResult())) |
| |
| try: |
| pid = os.getpid() |
| os.kill(pid, signal.SIGINT) |
| except KeyboardInterrupt: |
| pass |
| |
| self.assertFalse(result.shouldStop) |
| |
| def testMainInstallsHandler(self): |
| failfast = object() |
| test = object() |
| verbosity = object() |
| result = object() |
| default_handler = signal.getsignal(signal.SIGINT) |
| |
| class FakeRunner(object): |
| initArgs = [] |
| runArgs = [] |
| def __init__(self, *args, **kwargs): |
| self.initArgs.append((args, kwargs)) |
| def run(self, test): |
| self.runArgs.append(test) |
| return result |
| |
| class Program(unittest.TestProgram): |
| def __init__(self, catchbreak): |
| self.exit = False |
| self.verbosity = verbosity |
| self.failfast = failfast |
| self.catchbreak = catchbreak |
| self.tb_locals = False |
| self.testRunner = FakeRunner |
| self.test = test |
| self.result = None |
| |
| p = Program(False) |
| p.runTests() |
| |
| self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, |
| 'verbosity': verbosity, |
| 'failfast': failfast, |
| 'tb_locals': False, |
| 'warnings': None})]) |
| self.assertEqual(FakeRunner.runArgs, [test]) |
| self.assertEqual(p.result, result) |
| |
| self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| FakeRunner.initArgs = [] |
| FakeRunner.runArgs = [] |
| p = Program(True) |
| p.runTests() |
| |
| self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, |
| 'verbosity': verbosity, |
| 'failfast': failfast, |
| 'tb_locals': False, |
| 'warnings': None})]) |
| self.assertEqual(FakeRunner.runArgs, [test]) |
| self.assertEqual(p.result, result) |
| |
| self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| def testRemoveHandler(self): |
| default_handler = signal.getsignal(signal.SIGINT) |
| unittest.installHandler() |
| unittest.removeHandler() |
| self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| # check that calling removeHandler multiple times has no ill-effect |
| unittest.removeHandler() |
| self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| def testRemoveHandlerAsDecorator(self): |
| default_handler = signal.getsignal(signal.SIGINT) |
| unittest.installHandler() |
| |
| @unittest.removeHandler |
| def test(): |
| self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| test() |
| self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") |
| @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") |
| class TestBreakDefaultIntHandler(TestBreak): |
| int_handler = signal.default_int_handler |
| |
| @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") |
| @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") |
| class TestBreakSignalIgnored(TestBreak): |
| int_handler = signal.SIG_IGN |
| |
| @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") |
| @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") |
| class TestBreakSignalDefault(TestBreak): |
| int_handler = signal.SIG_DFL |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |