#!/usr/bin/env python | |
# | |
# Copyright 2001-2010 by Vinay Sajip. All Rights Reserved. | |
# | |
# Permission to use, copy, modify, and distribute this software and its | |
# documentation for any purpose and without fee is hereby granted, | |
# provided that the above copyright notice appear in all copies and that | |
# both that copyright notice and this permission notice appear in | |
# supporting documentation, and that the name of Vinay Sajip | |
# not be used in advertising or publicity pertaining to distribution | |
# of the software without specific, written prior permission. | |
# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING | |
# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL | |
# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR | |
# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER | |
# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT | |
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
"""Test harness for the logging module. Run all tests. | |
Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved. | |
""" | |
import logging | |
import logging.handlers | |
import logging.config | |
import codecs | |
import cPickle | |
import cStringIO | |
import gc | |
import json | |
import os | |
import re | |
import select | |
import socket | |
from SocketServer import ThreadingTCPServer, StreamRequestHandler | |
import struct | |
import sys | |
import tempfile | |
from test.test_support import captured_stdout, run_with_locale, run_unittest | |
import textwrap | |
import unittest | |
import warnings | |
import weakref | |
try: | |
import threading | |
except ImportError: | |
threading = None | |
class BaseTest(unittest.TestCase): | |
"""Base class for logging tests.""" | |
log_format = "%(name)s -> %(levelname)s: %(message)s" | |
expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$" | |
message_num = 0 | |
def setUp(self): | |
"""Setup the default logging stream to an internal StringIO instance, | |
so that we can examine log output as we want.""" | |
logger_dict = logging.getLogger().manager.loggerDict | |
logging._acquireLock() | |
try: | |
self.saved_handlers = logging._handlers.copy() | |
self.saved_handler_list = logging._handlerList[:] | |
self.saved_loggers = logger_dict.copy() | |
self.saved_level_names = logging._levelNames.copy() | |
finally: | |
logging._releaseLock() | |
# Set two unused loggers: one non-ASCII and one Unicode. | |
# This is to test correct operation when sorting existing | |
# loggers in the configuration code. See issue 8201. | |
logging.getLogger("\xab\xd7\xbb") | |
logging.getLogger(u"\u013f\u00d6\u0047") | |
self.root_logger = logging.getLogger("") | |
self.original_logging_level = self.root_logger.getEffectiveLevel() | |
self.stream = cStringIO.StringIO() | |
self.root_logger.setLevel(logging.DEBUG) | |
self.root_hdlr = logging.StreamHandler(self.stream) | |
self.root_formatter = logging.Formatter(self.log_format) | |
self.root_hdlr.setFormatter(self.root_formatter) | |
self.root_logger.addHandler(self.root_hdlr) | |
def tearDown(self): | |
"""Remove our logging stream, and restore the original logging | |
level.""" | |
self.stream.close() | |
self.root_logger.removeHandler(self.root_hdlr) | |
while self.root_logger.handlers: | |
h = self.root_logger.handlers[0] | |
self.root_logger.removeHandler(h) | |
h.close() | |
self.root_logger.setLevel(self.original_logging_level) | |
logging._acquireLock() | |
try: | |
logging._levelNames.clear() | |
logging._levelNames.update(self.saved_level_names) | |
logging._handlers.clear() | |
logging._handlers.update(self.saved_handlers) | |
logging._handlerList[:] = self.saved_handler_list | |
loggerDict = logging.getLogger().manager.loggerDict | |
loggerDict.clear() | |
loggerDict.update(self.saved_loggers) | |
finally: | |
logging._releaseLock() | |
def assert_log_lines(self, expected_values, stream=None): | |
"""Match the collected log lines against the regular expression | |
self.expected_log_pat, and compare the extracted group values to | |
the expected_values list of tuples.""" | |
stream = stream or self.stream | |
pat = re.compile(self.expected_log_pat) | |
try: | |
stream.reset() | |
actual_lines = stream.readlines() | |
except AttributeError: | |
# StringIO.StringIO lacks a reset() method. | |
actual_lines = stream.getvalue().splitlines() | |
self.assertEqual(len(actual_lines), len(expected_values)) | |
for actual, expected in zip(actual_lines, expected_values): | |
match = pat.search(actual) | |
if not match: | |
self.fail("Log line does not match expected pattern:\n" + | |
actual) | |
self.assertEqual(tuple(match.groups()), expected) | |
s = stream.read() | |
if s: | |
self.fail("Remaining output at end of log stream:\n" + s) | |
def next_message(self): | |
"""Generate a message consisting solely of an auto-incrementing | |
integer.""" | |
self.message_num += 1 | |
return "%d" % self.message_num | |
class BuiltinLevelsTest(BaseTest): | |
"""Test builtin levels and their inheritance.""" | |
def test_flat(self): | |
#Logging levels in a flat logger namespace. | |
m = self.next_message | |
ERR = logging.getLogger("ERR") | |
ERR.setLevel(logging.ERROR) | |
INF = logging.getLogger("INF") | |
INF.setLevel(logging.INFO) | |
DEB = logging.getLogger("DEB") | |
DEB.setLevel(logging.DEBUG) | |
# These should log. | |
ERR.log(logging.CRITICAL, m()) | |
ERR.error(m()) | |
INF.log(logging.CRITICAL, m()) | |
INF.error(m()) | |
INF.warn(m()) | |
INF.info(m()) | |
DEB.log(logging.CRITICAL, m()) | |
DEB.error(m()) | |
DEB.warn (m()) | |
DEB.info (m()) | |
DEB.debug(m()) | |
# These should not log. | |
ERR.warn(m()) | |
ERR.info(m()) | |
ERR.debug(m()) | |
INF.debug(m()) | |
self.assert_log_lines([ | |
('ERR', 'CRITICAL', '1'), | |
('ERR', 'ERROR', '2'), | |
('INF', 'CRITICAL', '3'), | |
('INF', 'ERROR', '4'), | |
('INF', 'WARNING', '5'), | |
('INF', 'INFO', '6'), | |
('DEB', 'CRITICAL', '7'), | |
('DEB', 'ERROR', '8'), | |
('DEB', 'WARNING', '9'), | |
('DEB', 'INFO', '10'), | |
('DEB', 'DEBUG', '11'), | |
]) | |
def test_nested_explicit(self): | |
# Logging levels in a nested namespace, all explicitly set. | |
m = self.next_message | |
INF = logging.getLogger("INF") | |
INF.setLevel(logging.INFO) | |
INF_ERR = logging.getLogger("INF.ERR") | |
INF_ERR.setLevel(logging.ERROR) | |
# These should log. | |
INF_ERR.log(logging.CRITICAL, m()) | |
INF_ERR.error(m()) | |
# These should not log. | |
INF_ERR.warn(m()) | |
INF_ERR.info(m()) | |
INF_ERR.debug(m()) | |
self.assert_log_lines([ | |
('INF.ERR', 'CRITICAL', '1'), | |
('INF.ERR', 'ERROR', '2'), | |
]) | |
def test_nested_inherited(self): | |
#Logging levels in a nested namespace, inherited from parent loggers. | |
m = self.next_message | |
INF = logging.getLogger("INF") | |
INF.setLevel(logging.INFO) | |
INF_ERR = logging.getLogger("INF.ERR") | |
INF_ERR.setLevel(logging.ERROR) | |
INF_UNDEF = logging.getLogger("INF.UNDEF") | |
INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") | |
UNDEF = logging.getLogger("UNDEF") | |
# These should log. | |
INF_UNDEF.log(logging.CRITICAL, m()) | |
INF_UNDEF.error(m()) | |
INF_UNDEF.warn(m()) | |
INF_UNDEF.info(m()) | |
INF_ERR_UNDEF.log(logging.CRITICAL, m()) | |
INF_ERR_UNDEF.error(m()) | |
# These should not log. | |
INF_UNDEF.debug(m()) | |
INF_ERR_UNDEF.warn(m()) | |
INF_ERR_UNDEF.info(m()) | |
INF_ERR_UNDEF.debug(m()) | |
self.assert_log_lines([ | |
('INF.UNDEF', 'CRITICAL', '1'), | |
('INF.UNDEF', 'ERROR', '2'), | |
('INF.UNDEF', 'WARNING', '3'), | |
('INF.UNDEF', 'INFO', '4'), | |
('INF.ERR.UNDEF', 'CRITICAL', '5'), | |
('INF.ERR.UNDEF', 'ERROR', '6'), | |
]) | |
def test_nested_with_virtual_parent(self): | |
# Logging levels when some parent does not exist yet. | |
m = self.next_message | |
INF = logging.getLogger("INF") | |
GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") | |
CHILD = logging.getLogger("INF.BADPARENT") | |
INF.setLevel(logging.INFO) | |
# These should log. | |
GRANDCHILD.log(logging.FATAL, m()) | |
GRANDCHILD.info(m()) | |
CHILD.log(logging.FATAL, m()) | |
CHILD.info(m()) | |
# These should not log. | |
GRANDCHILD.debug(m()) | |
CHILD.debug(m()) | |
self.assert_log_lines([ | |
('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), | |
('INF.BADPARENT.UNDEF', 'INFO', '2'), | |
('INF.BADPARENT', 'CRITICAL', '3'), | |
('INF.BADPARENT', 'INFO', '4'), | |
]) | |
class BasicFilterTest(BaseTest): | |
"""Test the bundled Filter class.""" | |
def test_filter(self): | |
# Only messages satisfying the specified criteria pass through the | |
# filter. | |
filter_ = logging.Filter("spam.eggs") | |
handler = self.root_logger.handlers[0] | |
try: | |
handler.addFilter(filter_) | |
spam = logging.getLogger("spam") | |
spam_eggs = logging.getLogger("spam.eggs") | |
spam_eggs_fish = logging.getLogger("spam.eggs.fish") | |
spam_bakedbeans = logging.getLogger("spam.bakedbeans") | |
spam.info(self.next_message()) | |
spam_eggs.info(self.next_message()) # Good. | |
spam_eggs_fish.info(self.next_message()) # Good. | |
spam_bakedbeans.info(self.next_message()) | |
self.assert_log_lines([ | |
('spam.eggs', 'INFO', '2'), | |
('spam.eggs.fish', 'INFO', '3'), | |
]) | |
finally: | |
handler.removeFilter(filter_) | |
# | |
# First, we define our levels. There can be as many as you want - the only | |
# limitations are that they should be integers, the lowest should be > 0 and | |
# larger values mean less information being logged. If you need specific | |
# level values which do not fit into these limitations, you can use a | |
# mapping dictionary to convert between your application levels and the | |
# logging system. | |
# | |
SILENT = 120 | |
TACITURN = 119 | |
TERSE = 118 | |
EFFUSIVE = 117 | |
SOCIABLE = 116 | |
VERBOSE = 115 | |
TALKATIVE = 114 | |
GARRULOUS = 113 | |
CHATTERBOX = 112 | |
BORING = 111 | |
LEVEL_RANGE = range(BORING, SILENT + 1) | |
# | |
# Next, we define names for our levels. You don't need to do this - in which | |
# case the system will use "Level n" to denote the text for the level. | |
# | |
my_logging_levels = { | |
SILENT : 'Silent', | |
TACITURN : 'Taciturn', | |
TERSE : 'Terse', | |
EFFUSIVE : 'Effusive', | |
SOCIABLE : 'Sociable', | |
VERBOSE : 'Verbose', | |
TALKATIVE : 'Talkative', | |
GARRULOUS : 'Garrulous', | |
CHATTERBOX : 'Chatterbox', | |
BORING : 'Boring', | |
} | |
class GarrulousFilter(logging.Filter): | |
"""A filter which blocks garrulous messages.""" | |
def filter(self, record): | |
return record.levelno != GARRULOUS | |
class VerySpecificFilter(logging.Filter): | |
"""A filter which blocks sociable and taciturn messages.""" | |
def filter(self, record): | |
return record.levelno not in [SOCIABLE, TACITURN] | |
class CustomLevelsAndFiltersTest(BaseTest): | |
"""Test various filtering possibilities with custom logging levels.""" | |
# Skip the logger name group. | |
expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" | |
def setUp(self): | |
BaseTest.setUp(self) | |
for k, v in my_logging_levels.items(): | |
logging.addLevelName(k, v) | |
def log_at_all_levels(self, logger): | |
for lvl in LEVEL_RANGE: | |
logger.log(lvl, self.next_message()) | |
def test_logger_filter(self): | |
# Filter at logger level. | |
self.root_logger.setLevel(VERBOSE) | |
# Levels >= 'Verbose' are good. | |
self.log_at_all_levels(self.root_logger) | |
self.assert_log_lines([ | |
('Verbose', '5'), | |
('Sociable', '6'), | |
('Effusive', '7'), | |
('Terse', '8'), | |
('Taciturn', '9'), | |
('Silent', '10'), | |
]) | |
def test_handler_filter(self): | |
# Filter at handler level. | |
self.root_logger.handlers[0].setLevel(SOCIABLE) | |
try: | |
# Levels >= 'Sociable' are good. | |
self.log_at_all_levels(self.root_logger) | |
self.assert_log_lines([ | |
('Sociable', '6'), | |
('Effusive', '7'), | |
('Terse', '8'), | |
('Taciturn', '9'), | |
('Silent', '10'), | |
]) | |
finally: | |
self.root_logger.handlers[0].setLevel(logging.NOTSET) | |
def test_specific_filters(self): | |
# Set a specific filter object on the handler, and then add another | |
# filter object on the logger itself. | |
handler = self.root_logger.handlers[0] | |
specific_filter = None | |
garr = GarrulousFilter() | |
handler.addFilter(garr) | |
try: | |
self.log_at_all_levels(self.root_logger) | |
first_lines = [ | |
# Notice how 'Garrulous' is missing | |
('Boring', '1'), | |
('Chatterbox', '2'), | |
('Talkative', '4'), | |
('Verbose', '5'), | |
('Sociable', '6'), | |
('Effusive', '7'), | |
('Terse', '8'), | |
('Taciturn', '9'), | |
('Silent', '10'), | |
] | |
self.assert_log_lines(first_lines) | |
specific_filter = VerySpecificFilter() | |
self.root_logger.addFilter(specific_filter) | |
self.log_at_all_levels(self.root_logger) | |
self.assert_log_lines(first_lines + [ | |
# Not only 'Garrulous' is still missing, but also 'Sociable' | |
# and 'Taciturn' | |
('Boring', '11'), | |
('Chatterbox', '12'), | |
('Talkative', '14'), | |
('Verbose', '15'), | |
('Effusive', '17'), | |
('Terse', '18'), | |
('Silent', '20'), | |
]) | |
finally: | |
if specific_filter: | |
self.root_logger.removeFilter(specific_filter) | |
handler.removeFilter(garr) | |
class MemoryHandlerTest(BaseTest): | |
"""Tests for the MemoryHandler.""" | |
# Do not bother with a logger name group. | |
expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" | |
def setUp(self): | |
BaseTest.setUp(self) | |
self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, | |
self.root_hdlr) | |
self.mem_logger = logging.getLogger('mem') | |
self.mem_logger.propagate = 0 | |
self.mem_logger.addHandler(self.mem_hdlr) | |
def tearDown(self): | |
self.mem_hdlr.close() | |
BaseTest.tearDown(self) | |
def test_flush(self): | |
# The memory handler flushes to its target handler based on specific | |
# criteria (message count and message level). | |
self.mem_logger.debug(self.next_message()) | |
self.assert_log_lines([]) | |
self.mem_logger.info(self.next_message()) | |
self.assert_log_lines([]) | |
# This will flush because the level is >= logging.WARNING | |
self.mem_logger.warn(self.next_message()) | |
lines = [ | |
('DEBUG', '1'), | |
('INFO', '2'), | |
('WARNING', '3'), | |
] | |
self.assert_log_lines(lines) | |
for n in (4, 14): | |
for i in range(9): | |
self.mem_logger.debug(self.next_message()) | |
self.assert_log_lines(lines) | |
# This will flush because it's the 10th message since the last | |
# flush. | |
self.mem_logger.debug(self.next_message()) | |
lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] | |
self.assert_log_lines(lines) | |
self.mem_logger.debug(self.next_message()) | |
self.assert_log_lines(lines) | |
class ExceptionFormatter(logging.Formatter): | |
"""A special exception formatter.""" | |
def formatException(self, ei): | |
return "Got a [%s]" % ei[0].__name__ | |
class ConfigFileTest(BaseTest): | |
"""Reading logging config from a .ini-style config file.""" | |
expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" | |
# config0 is a standard configuration. | |
config0 = """ | |
[loggers] | |
keys=root | |
[handlers] | |
keys=hand1 | |
[formatters] | |
keys=form1 | |
[logger_root] | |
level=WARNING | |
handlers=hand1 | |
[handler_hand1] | |
class=StreamHandler | |
level=NOTSET | |
formatter=form1 | |
args=(sys.stdout,) | |
[formatter_form1] | |
format=%(levelname)s ++ %(message)s | |
datefmt= | |
""" | |
# config1 adds a little to the standard configuration. | |
config1 = """ | |
[loggers] | |
keys=root,parser | |
[handlers] | |
keys=hand1 | |
[formatters] | |
keys=form1 | |
[logger_root] | |
level=WARNING | |
handlers= | |
[logger_parser] | |
level=DEBUG | |
handlers=hand1 | |
propagate=1 | |
qualname=compiler.parser | |
[handler_hand1] | |
class=StreamHandler | |
level=NOTSET | |
formatter=form1 | |
args=(sys.stdout,) | |
[formatter_form1] | |
format=%(levelname)s ++ %(message)s | |
datefmt= | |
""" | |
# config1a moves the handler to the root. | |
config1a = """ | |
[loggers] | |
keys=root,parser | |
[handlers] | |
keys=hand1 | |
[formatters] | |
keys=form1 | |
[logger_root] | |
level=WARNING | |
handlers=hand1 | |
[logger_parser] | |
level=DEBUG | |
handlers= | |
propagate=1 | |
qualname=compiler.parser | |
[handler_hand1] | |
class=StreamHandler | |
level=NOTSET | |
formatter=form1 | |
args=(sys.stdout,) | |
[formatter_form1] | |
format=%(levelname)s ++ %(message)s | |
datefmt= | |
""" | |
# config2 has a subtle configuration error that should be reported | |
config2 = config1.replace("sys.stdout", "sys.stbout") | |
# config3 has a less subtle configuration error | |
config3 = config1.replace("formatter=form1", "formatter=misspelled_name") | |
# config4 specifies a custom formatter class to be loaded | |
config4 = """ | |
[loggers] | |
keys=root | |
[handlers] | |
keys=hand1 | |
[formatters] | |
keys=form1 | |
[logger_root] | |
level=NOTSET | |
handlers=hand1 | |
[handler_hand1] | |
class=StreamHandler | |
level=NOTSET | |
formatter=form1 | |
args=(sys.stdout,) | |
[formatter_form1] | |
class=""" + __name__ + """.ExceptionFormatter | |
format=%(levelname)s:%(name)s:%(message)s | |
datefmt= | |
""" | |
# config5 specifies a custom handler class to be loaded | |
config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') | |
# config6 uses ', ' delimiters in the handlers and formatters sections | |
config6 = """ | |
[loggers] | |
keys=root,parser | |
[handlers] | |
keys=hand1, hand2 | |
[formatters] | |
keys=form1, form2 | |
[logger_root] | |
level=WARNING | |
handlers= | |
[logger_parser] | |
level=DEBUG | |
handlers=hand1 | |
propagate=1 | |
qualname=compiler.parser | |
[handler_hand1] | |
class=StreamHandler | |
level=NOTSET | |
formatter=form1 | |
args=(sys.stdout,) | |
[handler_hand2] | |
class=StreamHandler | |
level=NOTSET | |
formatter=form1 | |
args=(sys.stderr,) | |
[formatter_form1] | |
format=%(levelname)s ++ %(message)s | |
datefmt= | |
[formatter_form2] | |
format=%(message)s | |
datefmt= | |
""" | |
# config7 adds a compiler logger. | |
config7 = """ | |
[loggers] | |
keys=root,parser,compiler | |
[handlers] | |
keys=hand1 | |
[formatters] | |
keys=form1 | |
[logger_root] | |
level=WARNING | |
handlers=hand1 | |
[logger_compiler] | |
level=DEBUG | |
handlers= | |
propagate=1 | |
qualname=compiler | |
[logger_parser] | |
level=DEBUG | |
handlers= | |
propagate=1 | |
qualname=compiler.parser | |
[handler_hand1] | |
class=StreamHandler | |
level=NOTSET | |
formatter=form1 | |
args=(sys.stdout,) | |
[formatter_form1] | |
format=%(levelname)s ++ %(message)s | |
datefmt= | |
""" | |
def apply_config(self, conf): | |
file = cStringIO.StringIO(textwrap.dedent(conf)) | |
logging.config.fileConfig(file) | |
def test_config0_ok(self): | |
# A simple config file which overrides the default settings. | |
with captured_stdout() as output: | |
self.apply_config(self.config0) | |
logger = logging.getLogger() | |
# Won't output anything | |
logger.info(self.next_message()) | |
# Outputs a message | |
logger.error(self.next_message()) | |
self.assert_log_lines([ | |
('ERROR', '2'), | |
], stream=output) | |
# Original logger output is empty. | |
self.assert_log_lines([]) | |
def test_config1_ok(self, config=config1): | |
# A config file defining a sub-parser as well. | |
with captured_stdout() as output: | |
self.apply_config(config) | |
logger = logging.getLogger("compiler.parser") | |
# Both will output a message | |
logger.info(self.next_message()) | |
logger.error(self.next_message()) | |
self.assert_log_lines([ | |
('INFO', '1'), | |
('ERROR', '2'), | |
], stream=output) | |
# Original logger output is empty. | |
self.assert_log_lines([]) | |
def test_config2_failure(self): | |
# A simple config file which overrides the default settings. | |
self.assertRaises(StandardError, self.apply_config, self.config2) | |
def test_config3_failure(self): | |
# A simple config file which overrides the default settings. | |
self.assertRaises(StandardError, self.apply_config, self.config3) | |
def test_config4_ok(self): | |
# A config file specifying a custom formatter class. | |
with captured_stdout() as output: | |
self.apply_config(self.config4) | |
logger = logging.getLogger() | |
try: | |
raise RuntimeError() | |
except RuntimeError: | |
logging.exception("just testing") | |
sys.stdout.seek(0) | |
self.assertEqual(output.getvalue(), | |
"ERROR:root:just testing\nGot a [RuntimeError]\n") | |
# Original logger output is empty | |
self.assert_log_lines([]) | |
def test_config5_ok(self): | |
self.test_config1_ok(config=self.config5) | |
def test_config6_ok(self): | |
self.test_config1_ok(config=self.config6) | |
def test_config7_ok(self): | |
with captured_stdout() as output: | |
self.apply_config(self.config1a) | |
logger = logging.getLogger("compiler.parser") | |
# See issue #11424. compiler-hyphenated sorts | |
# between compiler and compiler.xyz and this | |
# was preventing compiler.xyz from being included | |
# in the child loggers of compiler because of an | |
# overzealous loop termination condition. | |
hyphenated = logging.getLogger('compiler-hyphenated') | |
# All will output a message | |
logger.info(self.next_message()) | |
logger.error(self.next_message()) | |
hyphenated.critical(self.next_message()) | |
self.assert_log_lines([ | |
('INFO', '1'), | |
('ERROR', '2'), | |
('CRITICAL', '3'), | |
], stream=output) | |
# Original logger output is empty. | |
self.assert_log_lines([]) | |
with captured_stdout() as output: | |
self.apply_config(self.config7) | |
logger = logging.getLogger("compiler.parser") | |
self.assertFalse(logger.disabled) | |
# Both will output a message | |
logger.info(self.next_message()) | |
logger.error(self.next_message()) | |
logger = logging.getLogger("compiler.lexer") | |
# Both will output a message | |
logger.info(self.next_message()) | |
logger.error(self.next_message()) | |
# Will not appear | |
hyphenated.critical(self.next_message()) | |
self.assert_log_lines([ | |
('INFO', '4'), | |
('ERROR', '5'), | |
('INFO', '6'), | |
('ERROR', '7'), | |
], stream=output) | |
# Original logger output is empty. | |
self.assert_log_lines([]) | |
class LogRecordStreamHandler(StreamRequestHandler): | |
"""Handler for a streaming logging request. It saves the log message in the | |
TCP server's 'log_output' attribute.""" | |
TCP_LOG_END = "!!!END!!!" | |
def handle(self): | |
"""Handle multiple requests - each expected to be of 4-byte length, | |
followed by the LogRecord in pickle format. Logs the record | |
according to whatever policy is configured locally.""" | |
while True: | |
chunk = self.connection.recv(4) | |
if len(chunk) < 4: | |
break | |
slen = struct.unpack(">L", chunk)[0] | |
chunk = self.connection.recv(slen) | |
while len(chunk) < slen: | |
chunk = chunk + self.connection.recv(slen - len(chunk)) | |
obj = self.unpickle(chunk) | |
record = logging.makeLogRecord(obj) | |
self.handle_log_record(record) | |
def unpickle(self, data): | |
return cPickle.loads(data) | |
def handle_log_record(self, record): | |
# If the end-of-messages sentinel is seen, tell the server to | |
# terminate. | |
if self.TCP_LOG_END in record.msg: | |
self.server.abort = 1 | |
return | |
self.server.log_output += record.msg + "\n" | |
class LogRecordSocketReceiver(ThreadingTCPServer): | |
"""A simple-minded TCP socket-based logging receiver suitable for test | |
purposes.""" | |
allow_reuse_address = 1 | |
log_output = "" | |
def __init__(self, host='localhost', | |
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, | |
handler=LogRecordStreamHandler): | |
ThreadingTCPServer.__init__(self, (host, port), handler) | |
self.abort = False | |
self.timeout = 0.1 | |
self.finished = threading.Event() | |
def serve_until_stopped(self): | |
while not self.abort: | |
rd, wr, ex = select.select([self.socket.fileno()], [], [], | |
self.timeout) | |
if rd: | |
self.handle_request() | |
# Notify the main thread that we're about to exit | |
self.finished.set() | |
# close the listen socket | |
self.server_close() | |
@unittest.skipUnless(threading, 'Threading required for this test.') | |
class SocketHandlerTest(BaseTest): | |
"""Test for SocketHandler objects.""" | |
def setUp(self): | |
"""Set up a TCP server to receive log messages, and a SocketHandler | |
pointing to that server's address and port.""" | |
BaseTest.setUp(self) | |
self.tcpserver = LogRecordSocketReceiver(port=0) | |
self.port = self.tcpserver.socket.getsockname()[1] | |
self.threads = [ | |
threading.Thread(target=self.tcpserver.serve_until_stopped)] | |
for thread in self.threads: | |
thread.start() | |
self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port) | |
self.sock_hdlr.setFormatter(self.root_formatter) | |
self.root_logger.removeHandler(self.root_logger.handlers[0]) | |
self.root_logger.addHandler(self.sock_hdlr) | |
def tearDown(self): | |
"""Shutdown the TCP server.""" | |
try: | |
self.tcpserver.abort = True | |
del self.tcpserver | |
self.root_logger.removeHandler(self.sock_hdlr) | |
self.sock_hdlr.close() | |
for thread in self.threads: | |
thread.join(2.0) | |
finally: | |
BaseTest.tearDown(self) | |
def get_output(self): | |
"""Get the log output as received by the TCP server.""" | |
# Signal the TCP receiver and wait for it to terminate. | |
self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END) | |
self.tcpserver.finished.wait(2.0) | |
return self.tcpserver.log_output | |
def test_output(self): | |
# The log message sent to the SocketHandler is properly received. | |
logger = logging.getLogger("tcp") | |
logger.error("spam") | |
logger.debug("eggs") | |
self.assertEqual(self.get_output(), "spam\neggs\n") | |
class MemoryTest(BaseTest): | |
"""Test memory persistence of logger objects.""" | |
def setUp(self): | |
"""Create a dict to remember potentially destroyed objects.""" | |
BaseTest.setUp(self) | |
self._survivors = {} | |
def _watch_for_survival(self, *args): | |
"""Watch the given objects for survival, by creating weakrefs to | |
them.""" | |
for obj in args: | |
key = id(obj), repr(obj) | |
self._survivors[key] = weakref.ref(obj) | |
def _assertTruesurvival(self): | |
"""Assert that all objects watched for survival have survived.""" | |
# Trigger cycle breaking. | |
gc.collect() | |
dead = [] | |
for (id_, repr_), ref in self._survivors.items(): | |
if ref() is None: | |
dead.append(repr_) | |
if dead: | |
self.fail("%d objects should have survived " | |
"but have been destroyed: %s" % (len(dead), ", ".join(dead))) | |
def test_persistent_loggers(self): | |
# Logger objects are persistent and retain their configuration, even | |
# if visible references are destroyed. | |
self.root_logger.setLevel(logging.INFO) | |
foo = logging.getLogger("foo") | |
self._watch_for_survival(foo) | |
foo.setLevel(logging.DEBUG) | |
self.root_logger.debug(self.next_message()) | |
foo.debug(self.next_message()) | |
self.assert_log_lines([ | |
('foo', 'DEBUG', '2'), | |
]) | |
del foo | |
# foo has survived. | |
self._assertTruesurvival() | |
# foo has retained its settings. | |
bar = logging.getLogger("foo") | |
bar.debug(self.next_message()) | |
self.assert_log_lines([ | |
('foo', 'DEBUG', '2'), | |
('foo', 'DEBUG', '3'), | |
]) | |
class EncodingTest(BaseTest): | |
def test_encoding_plain_file(self): | |
# In Python 2.x, a plain file object is treated as having no encoding. | |
log = logging.getLogger("test") | |
fn = tempfile.mktemp(".log") | |
# the non-ascii data we write to the log. | |
data = "foo\x80" | |
try: | |
handler = logging.FileHandler(fn) | |
log.addHandler(handler) | |
try: | |
# write non-ascii data to the log. | |
log.warning(data) | |
finally: | |
log.removeHandler(handler) | |
handler.close() | |
# check we wrote exactly those bytes, ignoring trailing \n etc | |
f = open(fn) | |
try: | |
self.assertEqual(f.read().rstrip(), data) | |
finally: | |
f.close() | |
finally: | |
if os.path.isfile(fn): | |
os.remove(fn) | |
def test_encoding_cyrillic_unicode(self): | |
log = logging.getLogger("test") | |
#Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) | |
message = u'\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' | |
#Ensure it's written in a Cyrillic encoding | |
writer_class = codecs.getwriter('cp1251') | |
writer_class.encoding = 'cp1251' | |
stream = cStringIO.StringIO() | |
writer = writer_class(stream, 'strict') | |
handler = logging.StreamHandler(writer) | |
log.addHandler(handler) | |
try: | |
log.warning(message) | |
finally: | |
log.removeHandler(handler) | |
handler.close() | |
# check we wrote exactly those bytes, ignoring trailing \n etc | |
s = stream.getvalue() | |
#Compare against what the data should be when encoded in CP-1251 | |
self.assertEqual(s, '\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') | |
class WarningsTest(BaseTest): | |
def test_warnings(self): | |
with warnings.catch_warnings(): | |
logging.captureWarnings(True) | |
try: | |
warnings.filterwarnings("always", category=UserWarning) | |
file = cStringIO.StringIO() | |
h = logging.StreamHandler(file) | |
logger = logging.getLogger("py.warnings") | |
logger.addHandler(h) | |
warnings.warn("I'm warning you...") | |
logger.removeHandler(h) | |
s = file.getvalue() | |
h.close() | |
self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0) | |
#See if an explicit file uses the original implementation | |
file = cStringIO.StringIO() | |
warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, | |
file, "Dummy line") | |
s = file.getvalue() | |
file.close() | |
self.assertEqual(s, | |
"dummy.py:42: UserWarning: Explicit\n Dummy line\n") | |
finally: | |
logging.captureWarnings(False) | |
def formatFunc(format, datefmt=None): | |
return logging.Formatter(format, datefmt) | |
def handlerFunc(): | |
return logging.StreamHandler() | |
class CustomHandler(logging.StreamHandler): | |
pass | |
class ConfigDictTest(BaseTest): | |
"""Reading logging config from a dictionary.""" | |
expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" | |
# config0 is a standard configuration. | |
config0 = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
'handlers' : ['hand1'], | |
}, | |
} | |
# config1 adds a little to the standard configuration. | |
config1 = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'DEBUG', | |
'handlers' : ['hand1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
}, | |
} | |
# config2 has a subtle configuration error that should be reported | |
config2 = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdbout', | |
}, | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'DEBUG', | |
'handlers' : ['hand1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
}, | |
} | |
#As config1 but with a misspelt level on a handler | |
config2a = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NTOSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'DEBUG', | |
'handlers' : ['hand1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
}, | |
} | |
#As config1 but with a misspelt level on a logger | |
config2b = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'DEBUG', | |
'handlers' : ['hand1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'WRANING', | |
}, | |
} | |
# config3 has a less subtle configuration error | |
config3 = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'misspelled_name', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'DEBUG', | |
'handlers' : ['hand1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
}, | |
} | |
# config4 specifies a custom formatter class to be loaded | |
config4 = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'()' : __name__ + '.ExceptionFormatter', | |
'format' : '%(levelname)s:%(name)s:%(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'root' : { | |
'level' : 'NOTSET', | |
'handlers' : ['hand1'], | |
}, | |
} | |
# As config4 but using an actual callable rather than a string | |
config4a = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'()' : ExceptionFormatter, | |
'format' : '%(levelname)s:%(name)s:%(message)s', | |
}, | |
'form2' : { | |
'()' : __name__ + '.formatFunc', | |
'format' : '%(levelname)s:%(name)s:%(message)s', | |
}, | |
'form3' : { | |
'()' : formatFunc, | |
'format' : '%(levelname)s:%(name)s:%(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
'hand2' : { | |
'()' : handlerFunc, | |
}, | |
}, | |
'root' : { | |
'level' : 'NOTSET', | |
'handlers' : ['hand1'], | |
}, | |
} | |
# config5 specifies a custom handler class to be loaded | |
config5 = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : __name__ + '.CustomHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'DEBUG', | |
'handlers' : ['hand1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
}, | |
} | |
# config6 specifies a custom handler class to be loaded | |
# but has bad arguments | |
config6 = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : __name__ + '.CustomHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
'9' : 'invalid parameter name', | |
}, | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'DEBUG', | |
'handlers' : ['hand1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
}, | |
} | |
#config 7 does not define compiler.parser but defines compiler.lexer | |
#so compiler.parser should be disabled after applying it | |
config7 = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'loggers' : { | |
'compiler.lexer' : { | |
'level' : 'DEBUG', | |
'handlers' : ['hand1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
}, | |
} | |
config8 = { | |
'version': 1, | |
'disable_existing_loggers' : False, | |
'formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'loggers' : { | |
'compiler' : { | |
'level' : 'DEBUG', | |
'handlers' : ['hand1'], | |
}, | |
'compiler.lexer' : { | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
}, | |
} | |
config9 = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'WARNING', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'WARNING', | |
'handlers' : ['hand1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'NOTSET', | |
}, | |
} | |
config9a = { | |
'version': 1, | |
'incremental' : True, | |
'handlers' : { | |
'hand1' : { | |
'level' : 'WARNING', | |
}, | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'INFO', | |
}, | |
}, | |
} | |
config9b = { | |
'version': 1, | |
'incremental' : True, | |
'handlers' : { | |
'hand1' : { | |
'level' : 'INFO', | |
}, | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'INFO', | |
}, | |
}, | |
} | |
#As config1 but with a filter added | |
config10 = { | |
'version': 1, | |
'formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'filters' : { | |
'filt1' : { | |
'name' : 'compiler.parser', | |
}, | |
}, | |
'handlers' : { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
'filters' : ['filt1'], | |
}, | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'DEBUG', | |
'filters' : ['filt1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
'handlers' : ['hand1'], | |
}, | |
} | |
#As config1 but using cfg:// references | |
config11 = { | |
'version': 1, | |
'true_formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handler_configs': { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'formatters' : 'cfg://true_formatters', | |
'handlers' : { | |
'hand1' : 'cfg://handler_configs[hand1]', | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'DEBUG', | |
'handlers' : ['hand1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
}, | |
} | |
#As config11 but missing the version key | |
config12 = { | |
'true_formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handler_configs': { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'formatters' : 'cfg://true_formatters', | |
'handlers' : { | |
'hand1' : 'cfg://handler_configs[hand1]', | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'DEBUG', | |
'handlers' : ['hand1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
}, | |
} | |
#As config11 but using an unsupported version | |
config13 = { | |
'version': 2, | |
'true_formatters': { | |
'form1' : { | |
'format' : '%(levelname)s ++ %(message)s', | |
}, | |
}, | |
'handler_configs': { | |
'hand1' : { | |
'class' : 'logging.StreamHandler', | |
'formatter' : 'form1', | |
'level' : 'NOTSET', | |
'stream' : 'ext://sys.stdout', | |
}, | |
}, | |
'formatters' : 'cfg://true_formatters', | |
'handlers' : { | |
'hand1' : 'cfg://handler_configs[hand1]', | |
}, | |
'loggers' : { | |
'compiler.parser' : { | |
'level' : 'DEBUG', | |
'handlers' : ['hand1'], | |
}, | |
}, | |
'root' : { | |
'level' : 'WARNING', | |
}, | |
} | |
def apply_config(self, conf): | |
logging.config.dictConfig(conf) | |
def test_config0_ok(self): | |
# A simple config which overrides the default settings. | |
with captured_stdout() as output: | |
self.apply_config(self.config0) | |
logger = logging.getLogger() | |
# Won't output anything | |
logger.info(self.next_message()) | |
# Outputs a message | |
logger.error(self.next_message()) | |
self.assert_log_lines([ | |
('ERROR', '2'), | |
], stream=output) | |
# Original logger output is empty. | |
self.assert_log_lines([]) | |
def test_config1_ok(self, config=config1): | |
# A config defining a sub-parser as well. | |
with captured_stdout() as output: | |
self.apply_config(config) | |
logger = logging.getLogger("compiler.parser") | |
# Both will output a message | |
logger.info(self.next_message()) | |
logger.error(self.next_message()) | |
self.assert_log_lines([ | |
('INFO', '1'), | |
('ERROR', '2'), | |
], stream=output) | |
# Original logger output is empty. | |
self.assert_log_lines([]) | |
def test_config2_failure(self): | |
# A simple config which overrides the default settings. | |
self.assertRaises(StandardError, self.apply_config, self.config2) | |
def test_config2a_failure(self): | |
# A simple config which overrides the default settings. | |
self.assertRaises(StandardError, self.apply_config, self.config2a) | |
def test_config2b_failure(self): | |
# A simple config which overrides the default settings. | |
self.assertRaises(StandardError, self.apply_config, self.config2b) | |
def test_config3_failure(self): | |
# A simple config which overrides the default settings. | |
self.assertRaises(StandardError, self.apply_config, self.config3) | |
def test_config4_ok(self): | |
# A config specifying a custom formatter class. | |
with captured_stdout() as output: | |
self.apply_config(self.config4) | |
#logger = logging.getLogger() | |
try: | |
raise RuntimeError() | |
except RuntimeError: | |
logging.exception("just testing") | |
sys.stdout.seek(0) | |
self.assertEqual(output.getvalue(), | |
"ERROR:root:just testing\nGot a [RuntimeError]\n") | |
# Original logger output is empty | |
self.assert_log_lines([]) | |
def test_config4a_ok(self): | |
# A config specifying a custom formatter class. | |
with captured_stdout() as output: | |
self.apply_config(self.config4a) | |
#logger = logging.getLogger() | |
try: | |
raise RuntimeError() | |
except RuntimeError: | |
logging.exception("just testing") | |
sys.stdout.seek(0) | |
self.assertEqual(output.getvalue(), | |
"ERROR:root:just testing\nGot a [RuntimeError]\n") | |
# Original logger output is empty | |
self.assert_log_lines([]) | |
def test_config5_ok(self): | |
self.test_config1_ok(config=self.config5) | |
def test_config6_failure(self): | |
self.assertRaises(StandardError, self.apply_config, self.config6) | |
def test_config7_ok(self): | |
with captured_stdout() as output: | |
self.apply_config(self.config1) | |
logger = logging.getLogger("compiler.parser") | |
# Both will output a message | |
logger.info(self.next_message()) | |
logger.error(self.next_message()) | |
self.assert_log_lines([ | |
('INFO', '1'), | |
('ERROR', '2'), | |
], stream=output) | |
# Original logger output is empty. | |
self.assert_log_lines([]) | |
with captured_stdout() as output: | |
self.apply_config(self.config7) | |
logger = logging.getLogger("compiler.parser") | |
self.assertTrue(logger.disabled) | |
logger = logging.getLogger("compiler.lexer") | |
# Both will output a message | |
logger.info(self.next_message()) | |
logger.error(self.next_message()) | |
self.assert_log_lines([ | |
('INFO', '3'), | |
('ERROR', '4'), | |
], stream=output) | |
# Original logger output is empty. | |
self.assert_log_lines([]) | |
#Same as test_config_7_ok but don't disable old loggers. | |
def test_config_8_ok(self): | |
with captured_stdout() as output: | |
self.apply_config(self.config1) | |
logger = logging.getLogger("compiler.parser") | |
# Both will output a message | |
logger.info(self.next_message()) | |
logger.error(self.next_message()) | |
self.assert_log_lines([ | |
('INFO', '1'), | |
('ERROR', '2'), | |
], stream=output) | |
# Original logger output is empty. | |
self.assert_log_lines([]) | |
with captured_stdout() as output: | |
self.apply_config(self.config8) | |
logger = logging.getLogger("compiler.parser") | |
self.assertFalse(logger.disabled) | |
# Both will output a message | |
logger.info(self.next_message()) | |
logger.error(self.next_message()) | |
logger = logging.getLogger("compiler.lexer") | |
# Both will output a message | |
logger.info(self.next_message()) | |
logger.error(self.next_message()) | |
self.assert_log_lines([ | |
('INFO', '3'), | |
('ERROR', '4'), | |
('INFO', '5'), | |
('ERROR', '6'), | |
], stream=output) | |
# Original logger output is empty. | |
self.assert_log_lines([]) | |
def test_config_9_ok(self): | |
with captured_stdout() as output: | |
self.apply_config(self.config9) | |
logger = logging.getLogger("compiler.parser") | |
#Nothing will be output since both handler and logger are set to WARNING | |
logger.info(self.next_message()) | |
self.assert_log_lines([], stream=output) | |
self.apply_config(self.config9a) | |
#Nothing will be output since both handler is still set to WARNING | |
logger.info(self.next_message()) | |
self.assert_log_lines([], stream=output) | |
self.apply_config(self.config9b) | |
#Message should now be output | |
logger.info(self.next_message()) | |
self.assert_log_lines([ | |
('INFO', '3'), | |
], stream=output) | |
def test_config_10_ok(self): | |
with captured_stdout() as output: | |
self.apply_config(self.config10) | |
logger = logging.getLogger("compiler.parser") | |
logger.warning(self.next_message()) | |
logger = logging.getLogger('compiler') | |
#Not output, because filtered | |
logger.warning(self.next_message()) | |
logger = logging.getLogger('compiler.lexer') | |
#Not output, because filtered | |
logger.warning(self.next_message()) | |
logger = logging.getLogger("compiler.parser.codegen") | |
#Output, as not filtered | |
logger.error(self.next_message()) | |
self.assert_log_lines([ | |
('WARNING', '1'), | |
('ERROR', '4'), | |
], stream=output) | |
def test_config11_ok(self): | |
self.test_config1_ok(self.config11) | |
def test_config12_failure(self): | |
self.assertRaises(StandardError, self.apply_config, self.config12) | |
def test_config13_failure(self): | |
self.assertRaises(StandardError, self.apply_config, self.config13) | |
@unittest.skipUnless(threading, 'listen() needs threading to work') | |
def setup_via_listener(self, text): | |
# Ask for a randomly assigned port (by using port 0) | |
t = logging.config.listen(0) | |
t.start() | |
t.ready.wait() | |
# Now get the port allocated | |
port = t.port | |
t.ready.clear() | |
try: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.settimeout(2.0) | |
sock.connect(('localhost', port)) | |
slen = struct.pack('>L', len(text)) | |
s = slen + text | |
sentsofar = 0 | |
left = len(s) | |
while left > 0: | |
sent = sock.send(s[sentsofar:]) | |
sentsofar += sent | |
left -= sent | |
sock.close() | |
finally: | |
t.ready.wait(2.0) | |
logging.config.stopListening() | |
t.join(2.0) | |
def test_listen_config_10_ok(self): | |
with captured_stdout() as output: | |
self.setup_via_listener(json.dumps(self.config10)) | |
logger = logging.getLogger("compiler.parser") | |
logger.warning(self.next_message()) | |
logger = logging.getLogger('compiler') | |
#Not output, because filtered | |
logger.warning(self.next_message()) | |
logger = logging.getLogger('compiler.lexer') | |
#Not output, because filtered | |
logger.warning(self.next_message()) | |
logger = logging.getLogger("compiler.parser.codegen") | |
#Output, as not filtered | |
logger.error(self.next_message()) | |
self.assert_log_lines([ | |
('WARNING', '1'), | |
('ERROR', '4'), | |
], stream=output) | |
def test_listen_config_1_ok(self): | |
with captured_stdout() as output: | |
self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) | |
logger = logging.getLogger("compiler.parser") | |
# Both will output a message | |
logger.info(self.next_message()) | |
logger.error(self.next_message()) | |
self.assert_log_lines([ | |
('INFO', '1'), | |
('ERROR', '2'), | |
], stream=output) | |
# Original logger output is empty. | |
self.assert_log_lines([]) | |
class ManagerTest(BaseTest): | |
def test_manager_loggerclass(self): | |
logged = [] | |
class MyLogger(logging.Logger): | |
def _log(self, level, msg, args, exc_info=None, extra=None): | |
logged.append(msg) | |
man = logging.Manager(None) | |
self.assertRaises(TypeError, man.setLoggerClass, int) | |
man.setLoggerClass(MyLogger) | |
logger = man.getLogger('test') | |
logger.warning('should appear in logged') | |
logging.warning('should not appear in logged') | |
self.assertEqual(logged, ['should appear in logged']) | |
class ChildLoggerTest(BaseTest): | |
def test_child_loggers(self): | |
r = logging.getLogger() | |
l1 = logging.getLogger('abc') | |
l2 = logging.getLogger('def.ghi') | |
c1 = r.getChild('xyz') | |
c2 = r.getChild('uvw.xyz') | |
self.assertTrue(c1 is logging.getLogger('xyz')) | |
self.assertTrue(c2 is logging.getLogger('uvw.xyz')) | |
c1 = l1.getChild('def') | |
c2 = c1.getChild('ghi') | |
c3 = l1.getChild('def.ghi') | |
self.assertTrue(c1 is logging.getLogger('abc.def')) | |
self.assertTrue(c2 is logging.getLogger('abc.def.ghi')) | |
self.assertTrue(c2 is c3) | |
# Set the locale to the platform-dependent default. I have no idea | |
# why the test does this, but in any case we save the current locale | |
# first and restore it at the end. | |
@run_with_locale('LC_ALL', '') | |
def test_main(): | |
run_unittest(BuiltinLevelsTest, BasicFilterTest, | |
CustomLevelsAndFiltersTest, MemoryHandlerTest, | |
ConfigFileTest, SocketHandlerTest, MemoryTest, | |
EncodingTest, WarningsTest, ConfigDictTest, ManagerTest, | |
ChildLoggerTest) | |
if __name__ == "__main__": | |
test_main() |