import gc | |
import os | |
import sys | |
import signal | |
import weakref | |
from cStringIO import StringIO | |
import unittest | |
@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") | |
@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") | |
@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " | |
"if threads have been used") | |
class TestBreak(unittest.TestCase): | |
def setUp(self): | |
self._default_handler = signal.getsignal(signal.SIGINT) | |
def tearDown(self): | |
signal.signal(signal.SIGINT, self._default_handler) | |
unittest.signals._results = weakref.WeakKeyDictionary() | |
unittest.signals._interrupt_handler = None | |
def testInstallHandler(self): | |
default_handler = signal.getsignal(signal.SIGINT) | |
unittest.installHandler() | |
self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) | |
try: | |
pid = os.getpid() | |
os.kill(pid, signal.SIGINT) | |
except KeyboardInterrupt: | |
self.fail("KeyboardInterrupt not handled") | |
self.assertTrue(unittest.signals._interrupt_handler.called) | |
def testRegisterResult(self): | |
result = unittest.TestResult() | |
unittest.registerResult(result) | |
for ref in unittest.signals._results: | |
if ref is result: | |
break | |
elif ref is not result: | |
self.fail("odd object in result set") | |
else: | |
self.fail("result not found") | |
def testInterruptCaught(self): | |
default_handler = signal.getsignal(signal.SIGINT) | |
result = unittest.TestResult() | |
unittest.installHandler() | |
unittest.registerResult(result) | |
self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) | |
def test(result): | |
pid = os.getpid() | |
os.kill(pid, signal.SIGINT) | |
result.breakCaught = True | |
self.assertTrue(result.shouldStop) | |
try: | |
test(result) | |
except KeyboardInterrupt: | |
self.fail("KeyboardInterrupt not handled") | |
self.assertTrue(result.breakCaught) | |
def testSecondInterrupt(self): | |
result = unittest.TestResult() | |
unittest.installHandler() | |
unittest.registerResult(result) | |
def test(result): | |
pid = os.getpid() | |
os.kill(pid, signal.SIGINT) | |
result.breakCaught = True | |
self.assertTrue(result.shouldStop) | |
os.kill(pid, signal.SIGINT) | |
self.fail("Second KeyboardInterrupt not raised") | |
try: | |
test(result) | |
except KeyboardInterrupt: | |
pass | |
else: | |
self.fail("Second KeyboardInterrupt not raised") | |
self.assertTrue(result.breakCaught) | |
def testTwoResults(self): | |
unittest.installHandler() | |
result = unittest.TestResult() | |
unittest.registerResult(result) | |
new_handler = signal.getsignal(signal.SIGINT) | |
result2 = unittest.TestResult() | |
unittest.registerResult(result2) | |
self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) | |
result3 = unittest.TestResult() | |
def test(result): | |
pid = os.getpid() | |
os.kill(pid, signal.SIGINT) | |
try: | |
test(result) | |
except KeyboardInterrupt: | |
self.fail("KeyboardInterrupt not handled") | |
self.assertTrue(result.shouldStop) | |
self.assertTrue(result2.shouldStop) | |
self.assertFalse(result3.shouldStop) | |
def testHandlerReplacedButCalled(self): | |
# If our handler has been replaced (is no longer installed) but is | |
# called by the *new* handler, then it isn't safe to delay the | |
# SIGINT and we should immediately delegate to the default handler | |
unittest.installHandler() | |
handler = signal.getsignal(signal.SIGINT) | |
def new_handler(frame, signum): | |
handler(frame, signum) | |
signal.signal(signal.SIGINT, new_handler) | |
try: | |
pid = os.getpid() | |
os.kill(pid, signal.SIGINT) | |
except KeyboardInterrupt: | |
pass | |
else: | |
self.fail("replaced but delegated handler doesn't raise interrupt") | |
def testRunner(self): | |
# Creating a TextTestRunner with the appropriate argument should | |
# register the TextTestResult it creates | |
runner = unittest.TextTestRunner(stream=StringIO()) | |
result = runner.run(unittest.TestSuite()) | |
self.assertIn(result, unittest.signals._results) | |
def testWeakReferences(self): | |
# Calling registerResult on a result should not keep it alive | |
result = unittest.TestResult() | |
unittest.registerResult(result) | |
ref = weakref.ref(result) | |
del result | |
# For non-reference counting implementations | |
gc.collect();gc.collect() | |
self.assertIsNone(ref()) | |
def testRemoveResult(self): | |
result = unittest.TestResult() | |
unittest.registerResult(result) | |
unittest.installHandler() | |
self.assertTrue(unittest.removeResult(result)) | |
# Should this raise an error instead? | |
self.assertFalse(unittest.removeResult(unittest.TestResult())) | |
try: | |
pid = os.getpid() | |
os.kill(pid, signal.SIGINT) | |
except KeyboardInterrupt: | |
pass | |
self.assertFalse(result.shouldStop) | |
def testMainInstallsHandler(self): | |
failfast = object() | |
test = object() | |
verbosity = object() | |
result = object() | |
default_handler = signal.getsignal(signal.SIGINT) | |
class FakeRunner(object): | |
initArgs = [] | |
runArgs = [] | |
def __init__(self, *args, **kwargs): | |
self.initArgs.append((args, kwargs)) | |
def run(self, test): | |
self.runArgs.append(test) | |
return result | |
class Program(unittest.TestProgram): | |
def __init__(self, catchbreak): | |
self.exit = False | |
self.verbosity = verbosity | |
self.failfast = failfast | |
self.catchbreak = catchbreak | |
self.testRunner = FakeRunner | |
self.test = test | |
self.result = None | |
p = Program(False) | |
p.runTests() | |
self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, | |
'verbosity': verbosity, | |
'failfast': failfast})]) | |
self.assertEqual(FakeRunner.runArgs, [test]) | |
self.assertEqual(p.result, result) | |
self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) | |
FakeRunner.initArgs = [] | |
FakeRunner.runArgs = [] | |
p = Program(True) | |
p.runTests() | |
self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, | |
'verbosity': verbosity, | |
'failfast': failfast})]) | |
self.assertEqual(FakeRunner.runArgs, [test]) | |
self.assertEqual(p.result, result) | |
self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) | |
def testRemoveHandler(self): | |
default_handler = signal.getsignal(signal.SIGINT) | |
unittest.installHandler() | |
unittest.removeHandler() | |
self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) | |
# check that calling removeHandler multiple times has no ill-effect | |
unittest.removeHandler() | |
self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) | |
def testRemoveHandlerAsDecorator(self): | |
default_handler = signal.getsignal(signal.SIGINT) | |
unittest.installHandler() | |
@unittest.removeHandler | |
def test(): | |
self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) | |
test() | |
self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) |