日志秘诀

作者

Vinay Sajip<Vinay_Sajip at Red Dove Dot com>

此页包含许多与日志记录相关的配方,这些配方在过去被发现很有用。

使用登录多个模块

多次调用 logging.getLogger('someLogger') 返回对同一记录器对象的引用。这不仅在同一个模块中是正确的,而且在同一个Python解释器进程中也可以跨模块实现。对同一对象的引用是正确的;此外,应用程序代码可以在一个模块中定义和配置父记录器,并在单独的模块中创建(但不配置)子记录器,对子记录器的所有调用都将传递给父级。下面是一个主要模块:

import logging
import auxiliary_module

# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)

logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')

这是辅助模块:

import logging

# create logger
module_logger = logging.getLogger('spam_application.auxiliary')

class Auxiliary:
    def __init__(self):
        self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
        self.logger.info('creating an instance of Auxiliary')

    def do_something(self):
        self.logger.info('doing something')
        a = 1 + 1
        self.logger.info('done doing something')

def some_function():
    module_logger.info('received a call to "some_function"')

输出如下:

2005-03-23 23:47:11,663 - spam_application - INFO -
   creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
   creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
   created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
   calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
   doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
   done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
   finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
   calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
   received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
   done with auxiliary_module.some_function()

从多个线程记录

从多个线程记录不需要特别的努力。以下示例显示主(初始)线程和另一个线程的日志记录:

import logging
import threading
import time

def worker(arg):
    while not arg['stop']:
        logging.debug('Hi from myfunc')
        time.sleep(0.5)

def main():
    logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
    info = {'stop': False}
    thread = threading.Thread(target=worker, args=(info,))
    thread.start()
    while True:
        try:
            logging.debug('Hello from main')
            time.sleep(0.75)
        except KeyboardInterrupt:
            info['stop'] = True
            break
    thread.join()

if __name__ == '__main__':
    main()

运行时,脚本应打印如下内容:

   0 Thread-1 Hi from myfunc
   3 MainThread Hello from main
 505 Thread-1 Hi from myfunc
 755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc

这显示了日志输出的分布情况。当然,这种方法适用于比这里所示更多的线程。

多个处理程序和格式化程序

记录器是普通的python对象。这个 addHandler() 方法没有可添加的处理程序数量的最小或最大配额。有时,应用程序将所有严重性的消息记录到一个文本文件中,同时将错误或更高级别的消息记录到控制台中是有益的。要设置它,只需配置适当的处理程序。应用程序代码中的日志记录调用将保持不变。下面是对以前基于模块的简单配置示例的一个细微修改:

import logging

logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)

# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')

请注意,“应用程序”代码不关心多个处理程序。所改变的只是添加和配置了一个名为 fh .

在编写和测试应用程序时,使用更高或更低的严重性筛选器创建新处理程序的能力非常有用。而不是多用 print 调试、使用的语句 logger.debug :与稍后必须删除或注释的print语句不同,logger.debug语句可以在源代码中保持不变,并保持休眠状态,直到再次需要它们为止。此时,需要发生的唯一更改是修改要调试的记录器和/或处理程序的严重性级别。

登录到多个目标

假设您希望以不同的消息格式和不同的环境登录到控制台和文件。假设您希望将调试级别高于文件级别的消息以及信息级别高于控制台级别的消息记录到日志中。我们还假设文件应该包含时间戳,但是控制台消息不应该包含时间戳。以下是你如何做到这一点:

import logging

# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                    datefmt='%m-%d %H:%M',
                    filename='/temp/myapp.log',
                    filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

当你运行这个时,在控制台上你会看到

root        : INFO     Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO     How quickly daft jumping zebras vex.
myapp.area2 : WARNING  Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR    The five boxing wizards jump quickly.

在文件里你会看到

10-22 22:19 root         INFO     Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1  DEBUG    Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1  INFO     How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2  WARNING  Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2  ERROR    The five boxing wizards jump quickly.

如您所见,调试消息只显示在文件中。其他消息将发送到两个目的地。

此示例使用控制台和文件处理程序,但您可以使用任意数量和您选择的处理程序组合。

配置服务器示例

以下是使用日志配置服务器的模块示例:

import logging
import logging.config
import time
import os

# read initial config file
logging.config.fileConfig('logging.conf')

# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()

logger = logging.getLogger('simpleExample')

try:
    # loop through logging calls to see the difference
    # new configurations make, until Ctrl+C is pressed
    while True:
        logger.debug('debug message')
        logger.info('info message')
        logger.warning('warn message')
        logger.error('error message')
        logger.critical('critical message')
        time.sleep(5)
except KeyboardInterrupt:
    # cleanup
    logging.config.stopListening()
    t.join()

下面是一个脚本,它获取一个文件名并将该文件发送到服务器,前面正确地加上二进制编码的长度,作为新的日志配置:

#!/usr/bin/env python
import socket, sys, struct

with open(sys.argv[1], 'rb') as f:
    data_to_send = f.read()

HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')

处理阻塞的处理程序

有时,您必须让日志记录处理程序在不阻塞您要记录的线程的情况下完成它们的工作。这在Web应用程序中很常见,当然它也出现在其他场景中。

一个常见的罪魁祸首是 SMTPHandler :发送电子邮件可能需要很长时间,原因有很多,超出了开发人员的控制范围(例如,邮件或网络基础设施性能不佳)。但几乎任何基于网络的处理程序都可以阻止:甚至 SocketHandler 操作可能会在引擎盖下执行一个DNS查询,速度太慢(而且这个查询可以深入到套接字库代码中,在Python层下面,在您的控制之外)。

一种解决方案是使用两部分的方法。对于第一部分,只附加 QueueHandler 从性能关键线程访问的日志记录器。它们只需写入队列,队列的大小可以达到足够大的容量,或者在初始化时不受大小上限的限制。对队列的写入通常会很快被接受,尽管您可能需要捕获 queue.Full 在代码中作为预防措施的异常。如果您是一个在代码中有性能关键线程的库开发人员,请确保将其记录在案(连同仅附加的建议 QueueHandlers 为了其他开发者的利益,他们会使用你的代码。

解决方案的第二部分是 QueueListener ,其被设计为 QueueHandler . 一 QueueListener 非常简单:它传递了一个队列和一些处理程序,并激发了一个内部线程,该线程监听其队列中发送的日志记录。 QueueHandlers (或任何其他来源) LogRecords 就这点而言)。这个 LogRecords 从队列中删除并传递给处理程序进行处理。

分开的优势 QueueListener 类是可以使用同一实例为多个 QueueHandlers . 这比拥有现有处理程序类的线程版本更具资源友好性,因为线程版本会占用每个处理程序一个线程,而不会带来任何特殊的好处。

下面是使用这两个类的示例(省略了imports)::

que = queue.Queue(-1)  # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()

运行时将产生:

MainThread: Look out!

在 3.5 版更改: 在python 3.5之前, QueueListener 总是将从队列接收到的每个消息传递给它初始化时使用的每个处理程序。(这是因为假定级别筛选都是在队列填充的另一侧完成的。)从3.5开始,可以通过传递关键字参数来更改此行为。 respect_handler_level=True 到侦听器的构造函数。完成此操作后,侦听器将每个消息的级别与处理程序的级别进行比较,并且仅在适当时将消息传递给处理程序。

通过网络发送和接收日志事件

假设您希望通过网络发送日志事件,并在接收端处理它们。一个简单的方法是 SocketHandler 发送端根记录器的实例::

import logging, logging.handlers

rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
                    logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

在接收端,您可以使用 socketserver 模块。下面是一个基本的工作示例:

import pickle
import logging
import logging.handlers
import socketserver
import struct


class LogRecordStreamHandler(socketserver.StreamRequestHandler):
    """Handler for a streaming logging request.

    This basically logs the record using whatever logging policy is
    configured locally.
    """

    def handle(self):
        """
        Handle multiple requests - each expected to be a 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally.
        """
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)

    def unPickle(self, data):
        return pickle.loads(data)

    def handleLogRecord(self, record):
        # if a name is specified, we use the named logger rather than the one
        # implied by the record.
        if self.server.logname is not None:
            name = self.server.logname
        else:
            name = record.name
        logger = logging.getLogger(name)
        # N.B. EVERY record gets logged. This is because Logger.handle
        # is normally called AFTER logger-level filtering. If you want
        # to do filtering, do it at the client end to save wasting
        # cycles and network bandwidth!
        logger.handle(record)

class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
    """
    Simple TCP socket-based logging receiver suitable for testing.
    """

    allow_reuse_address = True

    def __init__(self, host='localhost',
                 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
                 handler=LogRecordStreamHandler):
        socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
        self.abort = 0
        self.timeout = 1
        self.logname = None

    def serve_until_stopped(self):
        import select
        abort = 0
        while not abort:
            rd, wr, ex = select.select([self.socket.fileno()],
                                       [], [],
                                       self.timeout)
            if rd:
                self.handle_request()
            abort = self.abort

def main():
    logging.basicConfig(
        format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
    tcpserver = LogRecordSocketReceiver()
    print('About to start TCP server...')
    tcpserver.serve_until_stopped()

if __name__ == '__main__':
    main()

首先运行服务器,然后运行客户机。在客户端,控制台上不打印任何内容;在服务器端,您应该看到如下内容:

About to start TCP server...
   59 root            INFO     Jackdaws love my big sphinx of quartz.
   59 myapp.area1     DEBUG    Quick zephyrs blow, vexing daft Jim.
   69 myapp.area1     INFO     How quickly daft jumping zebras vex.
   69 myapp.area2     WARNING  Jail zesty vixen who grabbed pay from quack.
   69 myapp.area2     ERROR    The five boxing wizards jump quickly.

请注意,在某些情况下,pickle存在一些安全问题。如果这些影响到您,您可以通过重写 makePickle() 方法并在那里实现您的替代方法,以及调整上述脚本以使用您的替代序列化。

将上下文信息添加到日志输出中

有时,除了传递给日志记录调用的参数之外,还希望日志记录输出包含上下文信息。例如,在联网应用程序中,可能需要将特定于客户机的信息记录在日志中(例如,远程客户机的用户名或IP地址)。尽管你可以使用 额外的 为了实现这一点,用这种方式传递信息并不总是方便的。虽然它可能会吸引 Logger 基于每个连接的实例,这不是一个好主意,因为这些实例不是垃圾收集的。虽然这在实践中不是问题,但当 Logger 实例取决于要在记录应用程序时使用的粒度级别,如果 Logger 实例变得有效地没有边界。

使用loggeradapter传递上下文信息

一种简单的方法是使用 LoggerAdapter 类。这门课设计得像 Logger ,以便您可以拨打 debug()info()warning()error()exception()critical()log() .这些方法与中的对应方法具有相同的签名。 Logger ,因此您可以互换使用这两种类型的实例。

当您创建的实例 LoggerAdapter 你通过它 Logger 实例和包含上下文信息的类似dict的对象。当您在的实例上调用其中一个日志方法时, LoggerAdapter ,它将调用委托给 Logger 传递给它的构造函数,并安排在委托调用中传递上下文信息。这是代码的一个片段 LoggerAdapter ::

def debug(self, msg, /, *args, **kwargs):
    """
    Delegate a debug call to the underlying logger, after adding
    contextual information from this adapter instance.
    """
    msg, kwargs = self.process(msg, kwargs)
    self.logger.debug(msg, *args, **kwargs)

这个 process() 方法 LoggerAdapter 是将上下文信息添加到日志输出的位置。它传递了日志记录调用的消息和关键字参数,并将这些参数的修改版本(可能)传递回底层记录器,以便在调用中使用。此方法的默认实现只保留消息,但在关键字参数中插入“extra”键,该参数的值是传递给构造函数的类似dict的对象。当然,如果在对适配器的调用中传递了一个“extra”关键字参数,它将被静默覆盖。

使用“extra”的好处是,dict-like对象中的值被合并到 LogRecord 实例 __dict__, 允许使用自定义字符串 Formatter 知道dict-like对象的键的实例。如果您需要一个不同的方法,例如,如果您想在消息字符串前面加上或附加上下文信息,您只需要子类 LoggerAdapter 超驰 process() 去做你需要的。下面是一个简单的例子:

class CustomAdapter(logging.LoggerAdapter):
    """
    This example adapter expects the passed in dict-like object to have a
    'connid' key, whose value in brackets is prepended to the log message.
    """
    def process(self, msg, kwargs):
        return '[%s] %s' % (self.extra['connid'], msg), kwargs

你可以这样使用:

logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})

然后,您记录到适配器的任何事件的值都将为 some_conn_id 预先准备好日志消息。

使用dicts以外的对象传递上下文信息

你不需要把实际的口述传给 LoggerAdapter -可以传递实现 __getitem____iter__ 让它看起来像是一个记录。如果要动态生成值(而dict中的值是常量),这将非常有用。

使用过滤器传递上下文信息

还可以使用用户定义的日志输出添加上下文信息 Filter . Filter 允许实例修改 LogRecords 传递给它们,包括添加附加属性,然后可以使用合适的格式字符串输出,或者如果需要自定义 Formatter .

例如,在Web应用程序中,正在处理的请求(或者至少是其中有趣的部分)可以存储在threadlocal中 (threading.local )变量,然后从 Filter 例如,要将来自请求的信息(例如,远程IP地址和远程用户的用户名)添加到 LogRecord ,使用属性名“ip”和“user”,如 LoggerAdapter 上面的例子。在这种情况下,可以使用相同的格式字符串获得与上面所示类似的输出。下面是一个脚本示例:

import logging
from random import choice

class ContextFilter(logging.Filter):
    """
    This is a filter which injects contextual information into the log.

    Rather than use actual contextual information, we just use random
    data in this demo.
    """

    USERS = ['jim', 'fred', 'sheila']
    IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']

    def filter(self, record):

        record.ip = choice(ContextFilter.IPS)
        record.user = choice(ContextFilter.USERS)
        return True

if __name__ == '__main__':
    levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
    a1 = logging.getLogger('a.b.c')
    a2 = logging.getLogger('d.e.f')

    f = ContextFilter()
    a1.addFilter(f)
    a2.addFilter(f)
    a1.debug('A debug message')
    a1.info('An info message with %s', 'some parameters')
    for x in range(10):
        lvl = choice(levels)
        lvlname = logging.getLevelName(lvl)
        a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')

当运行时,会产生如下结果:

2010-09-06 22:38:15,292 a.b.c DEBUG    IP: 123.231.231.123 User: fred     A debug message
2010-09-06 22:38:15,300 a.b.c INFO     IP: 192.168.0.1     User: sheila   An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 127.0.0.1       User: jim      A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 127.0.0.1       User: sheila   A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 123.231.231.123 User: fred     A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1     User: jim      A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 192.168.0.1     User: jim      A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR    IP: 127.0.0.1       User: sheila   A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG    IP: 123.231.231.123 User: fred     A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO     IP: 123.231.231.123 User: fred     A message at INFO level with 2 parameters

从多个进程记录到单个文件

尽管日志记录是线程安全的,并且在单个进程中从多个线程记录到单个文件 is 支持,从登录到单个文件 多进程not 支持,因为在python中没有标准方法可以跨多个进程序列化对单个文件的访问。如果需要从多个进程记录到单个文件,一种方法是将所有进程记录到 SocketHandler ,并有一个单独的进程,该进程实现从套接字读取并记录到文件的套接字服务器。(如果愿意,可以在现有进程中指定一个线程来执行此功能。) This section 更详细地记录这种方法,包括一个工作套接字接收器,它可以作为您适应自己的应用程序的起点。

您还可以编写自己的处理程序,该处理程序使用 Lock 类从 multiprocessing 用于从进程序列化对文件的访问的模块。现有的 FileHandler 子类不利用 multiprocessing 目前,尽管他们将来可能会这样做。请注意,目前 multiprocessing 模块不提供所有平台上的工作锁定功能(请参阅https://bugs.python.org/issue3770)。

或者,您可以使用 Queue 和A QueueHandler 将所有日志记录事件发送到多进程应用程序中的一个进程。下面的示例脚本演示了如何做到这一点;在该示例中,一个单独的侦听器进程侦听其他进程发送的事件,并根据自己的日志配置记录这些事件。尽管该示例只演示了一种方法(例如,您可能希望使用侦听器线程而不是单独的侦听器进程——实现是类似的),但它确实允许侦听器和应用程序中的其他进程使用完全不同的日志配置,并且可以用作符合您自己特定要求的代码:

# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing

# Next two import lines for this demo only
from random import choice, random
import time

#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
    configurer(queue)
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)
    print('Worker finished: %s' % name)

# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                         args=(queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()

if __name__ == '__main__':
    main()

上述脚本的一个变体将日志记录保存在主进程中的单独线程中:

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
    while True:
        record = q.get()
        if record is None:
            break
        logger = logging.getLogger(record.name)
        logger.handle(record)


def worker_process(q):
    qh = logging.handlers.QueueHandler(q)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(qh)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
    q = Queue()
    d = {
        'version': 1,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'level': 'ERROR',
                'formatter': 'detailed',
            },
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console', 'file', 'errors']
        },
    }
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
        workers.append(wp)
        wp.start()
    logging.config.dictConfig(d)
    lp = threading.Thread(target=logger_thread, args=(q,))
    lp.start()
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...
    for wp in workers:
        wp.join()
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()

此变量显示如何为特定记录器应用配置,例如 foo 记录器有一个特殊的处理程序,它将所有事件存储在 foo 文件中的子系统 mplog-foo.log . 这将由主进程中的日志记录机器使用(即使日志记录事件是在工作进程中生成的),以将消息定向到适当的目标。

使用concurrent.futures.processPoolexecutor

如果你想用 concurrent.futures.ProcessPoolExecutor 要启动工作进程,您需要创建稍微不同的队列。而不是

queue = multiprocessing.Queue(-1)

你应该使用

queue = multiprocessing.Manager().Queue(-1)  # also works with the examples above

然后你可以用这个来替换工人创造:

workers = []
for i in range(10):
    worker = multiprocessing.Process(target=worker_process,
                                     args=(queue, worker_configurer))
    workers.append(worker)
    worker.start()
for w in workers:
    w.join()

为此(记住第一次导入 concurrent.futures ):

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    for i in range(10):
        executor.submit(worker_process, queue, worker_configurer)

使用文件旋转

有时,您希望让一个日志文件增长到一定的大小,然后打开一个新文件并记录到该文件。您可能希望保留一定数量的这些文件,并且在创建了这么多文件后,旋转这些文件,使文件的数量和文件的大小保持有界。对于此使用模式,日志记录包提供 RotatingFileHandler ::

import glob
import logging
import logging.handlers

LOG_FILENAME = 'logging_rotatingfile_example.out'

# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)

# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
              LOG_FILENAME, maxBytes=20, backupCount=5)

my_logger.addHandler(handler)

# Log some messages
for i in range(20):
    my_logger.debug('i = %d' % i)

# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)

for filename in logfiles:
    print(filename)

结果应该是6个单独的文件,每个文件都包含应用程序日志历史的一部分:

logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5

最新的文件总是 logging_rotatingfile_example.out ,每次达到大小限制时,都用后缀重命名 .1 . 每个现有的备份文件都将被重命名以增加后缀 (.1 变成 .2 等等)和 .6 文件被擦除。

显然,这个例子将对数长度设置得太小,作为一个极端的例子。你想设定 最大字节 以适当的值。

使用替代格式样式

当日志记录被添加到Python标准库中时,用变量内容格式化消息的唯一方法是使用百分比格式方法。从那时起,python获得了两种新的格式化方法: string.Template (在python 2.4中添加)和 str.format() (在python 2.6中添加)。

日志记录(从3.2开始)提供了对这两种附加格式样式的改进支持。这个 Formatter 类已被增强以获取一个名为 style . 默认为 '%' ,但其他可能的值是 '{{''$' ,与其他两种格式样式相对应。默认情况下(如您所期望的那样)保持向后兼容性,但通过显式指定样式参数,您可以指定使用的格式字符串 str.format()string.Template . 下面是一个示例控制台会话,展示了以下可能性:

>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
...                        style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG    This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
...                        style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>

请注意,日志消息的最终输出格式完全独立于单个日志消息的构造方式。仍然可以使用%格式,如下所示:

>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>

日志调用 (logger.debug()logger.info() 等等)只为实际日志消息本身获取位置参数,关键字参数仅用于确定如何处理实际日志调用的选项(例如 exc_info 关键字参数,指示应记录追溯信息,或 extra 关键字参数,指示要添加到日志中的其他上下文信息)。因此,您不能使用 str.format()string.Template 语法,因为日志记录包在内部使用了%格式来合并格式字符串和变量参数。在保持向后兼容性的同时不会改变这一点,因为现有代码中的所有日志记录调用都将使用%格式字符串。

但是,有一种方法可以使用和$-格式来构造单个日志消息。回想一下,对于消息,可以使用任意对象作为消息格式字符串,并且日志记录包将调用 str() 在该对象上获取实际的格式字符串。考虑以下两个类:

class BraceMessage:
    def __init__(self, fmt, /, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, /, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

其中任何一个都可以用来代替格式字符串,以允许使用-或$-格式来构建实际的“消息”部分,该部分显示在格式化的日志输出中,而不是“%(message)s”或“message”或“$message”。每当你想记录一些东西时使用类名有点笨拙,但是如果你使用一个别名,比如 __ (双下划线---不要与uu混淆,单下划线用作 gettext.gettext() 或者它的兄弟们)。

上面的类不包含在Python中,尽管它们很容易复制并粘贴到您自己的代码中。它们可以如下使用(假设它们是在一个名为 wherever ):

>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
...       point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

上面的例子使用 print() 为了显示格式如何工作,您当然要使用 logger.debug() 或者类似于使用这种方法进行实际日志记录。

需要注意的一点是,使用这种方法,您不需要支付显著的性能损失:实际的格式化不是在您进行日志调用时发生的,而是当(如果)日志消息实际上将由处理程序输出到日志时发生的。所以,唯一可能让你感到困惑的不寻常的事情是,括号围绕着格式字符串和参数,而不仅仅是格式字符串。那是因为 __ 对于一个对XXXMessage类之一的构造函数调用,符号只是语法上的糖分。

如果您愿意,可以使用 LoggerAdapter 达到与上述效果类似的效果,如下面的例子:

import logging

class Message:
    def __init__(self, fmt, args):
        self.fmt = fmt
        self.args = args

    def __str__(self):
        return self.fmt.format(*self.args)

class StyleAdapter(logging.LoggerAdapter):
    def __init__(self, logger, extra=None):
        super(StyleAdapter, self).__init__(logger, extra or {})

    def log(self, level, msg, /, *args, **kwargs):
        if self.isEnabledFor(level):
            msg, kwargs = self.process(msg, kwargs)
            self.logger._log(level, Message(msg, args), (), **kwargs)

logger = StyleAdapter(logging.getLogger(__name__))

def main():
    logger.debug('Hello, {}', 'world!')

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    main()

上面的脚本应该记录消息 Hello, world! 使用python 3.2或更高版本运行时。

定制 LogRecord

每个日志事件都由 LogRecord 实例。当一个事件被记录下来并且没有被一个记录器的级别过滤掉时,一个 LogRecord 创建并填充有关事件的信息,然后传递给该记录器的处理程序(及其祖先,直到并包括禁用层次结构上进一步传播的记录器)。在Python3.2之前,只有两个地方完成了这个创建:

这通常意味着,如果您需要对 LogRecord ,您必须执行以下操作之一。

第一种方法在一些不同的库想要做不同的事情的场景中有点笨拙。每个人都会试图设置自己的 Logger 子类,最后一个这样做的将获胜。

第二种方法在许多情况下都可以很好地工作,但不允许您使用 LogRecord . 库开发人员可以在他们的记录器上设置一个合适的过滤器,但他们必须记住每次引入一个新的记录器时都要这样做(他们只需添加新的包或模块并执行以下操作即可做到这一点:

logger = logging.getLogger(__name__)

在模块级)。这可能是一件太多的事情需要考虑。开发人员还可以将过滤器添加到 NullHandler 附加到它们的顶级记录器,但是如果应用程序开发人员将一个处理程序附加到一个较低级别的库记录器,则不会调用这个调用——因此,来自该处理程序的输出不会反映库开发人员的意图。

在python 3.2及更高版本中, LogRecord 创建是通过工厂完成的,您可以指定工厂。工厂只是一个你可以设置的调用 setLogRecordFactory() 和审问 getLogRecordFactory() . 调用工厂时使用的签名与 LogRecord 构造函数 LogRecord 是工厂的默认设置。

这种方法允许自定义工厂控制日志记录创建的所有方面。例如,您可以返回一个子类,或者在创建记录后添加一些附加属性,使用类似这样的模式:

old_factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
    record = old_factory(*args, **kwargs)
    record.custom_attribute = 0xdecafbad
    return record

logging.setLogRecordFactory(record_factory)

这种模式允许不同的库将工厂链接在一起,只要它们不覆盖彼此的属性或无意中覆盖作为标准提供的属性,就不会有意外。但是,应该记住,链中的每个链接都会为所有日志记录操作增加运行时开销,并且只有在使用 Filter 无法提供所需的结果。

子类化队列处理程序-ZeroMQ示例

你可以使用 QueueHandler 子类将消息发送到其他类型的队列,例如zeromq“publish”套接字。在下面的示例中,将单独创建套接字并将其传递给处理程序(作为其“队列”)::

import zmq   # using pyzmq, the Python binding for ZeroMQ
import json  # for serializing records portably

ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB)  # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556')        # or wherever

class ZeroMQSocketHandler(QueueHandler):
    def enqueue(self, record):
        self.queue.send_json(record.__dict__)


handler = ZeroMQSocketHandler(sock)

当然,还有其他的组织方式,例如传入处理程序创建套接字所需的数据:

class ZeroMQSocketHandler(QueueHandler):
    def __init__(self, uri, socktype=zmq.PUB, ctx=None):
        self.ctx = ctx or zmq.Context()
        socket = zmq.Socket(self.ctx, socktype)
        socket.bind(uri)
        super().__init__(socket)

    def enqueue(self, record):
        self.queue.send_json(record.__dict__)

    def close(self):
        self.queue.close()

QueueListener子类化-ZeroMQ示例

您也可以子类 QueueListener 从其他类型的队列获取消息,例如zeromq“subscribe”套接字。举个例子:

class ZeroMQSocketListener(QueueListener):
    def __init__(self, uri, /, *handlers, **kwargs):
        self.ctx = kwargs.get('ctx') or zmq.Context()
        socket = zmq.Socket(self.ctx, zmq.SUB)
        socket.setsockopt_string(zmq.SUBSCRIBE, '')  # subscribe to everything
        socket.connect(uri)
        super().__init__(socket, *handlers, **kwargs)

    def dequeue(self):
        msg = self.queue.recv_json()
        return logging.makeLogRecord(msg)

参见

模块 logging

日志模块的API引用。

模块 logging.config

日志模块的配置API。

模块 logging.handlers

日志模块中包含有用的处理程序。

A basic logging tutorial

A more advanced logging tutorial

基于字典的配置示例

下面是一个日志配置字典的示例-它取自 documentation on the Django project .这本词典传给 dictConfig() 要使配置生效:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': True,
    'formatters': {
        'verbose': {
            'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
        },
        'simple': {
            'format': '%(levelname)s %(message)s'
        },
    },
    'filters': {
        'special': {
            '()': 'project.logging.SpecialFilter',
            'foo': 'bar',
        }
    },
    'handlers': {
        'null': {
            'level':'DEBUG',
            'class':'django.utils.log.NullHandler',
        },
        'console':{
            'level':'DEBUG',
            'class':'logging.StreamHandler',
            'formatter': 'simple'
        },
        'mail_admins': {
            'level': 'ERROR',
            'class': 'django.utils.log.AdminEmailHandler',
            'filters': ['special']
        }
    },
    'loggers': {
        'django': {
            'handlers':['null'],
            'propagate': True,
            'level':'INFO',
        },
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': False,
        },
        'myproject.custom': {
            'handlers': ['console', 'mail_admins'],
            'level': 'INFO',
            'filters': ['special']
        }
    }
}

有关此配置的详细信息,请参见 relevant section Django文件。

使用Rotator和Namer自定义日志旋转处理

下面的代码片段给出了如何定义命名器和旋转器的示例,其中显示了日志文件的基于zlib的压缩:

def namer(name):
    return name + ".gz"

def rotator(source, dest):
    with open(source, "rb") as sf:
        data = sf.read()
        compressed = zlib.compress(data, 9)
        with open(dest, "wb") as df:
            df.write(compressed)
    os.remove(source)

rh = logging.handlers.RotatingFileHandler(...)
rh.rotator = rotator
rh.namer = namer

这些文件不是“真的”.gz文件,因为它们是纯压缩数据,没有“容器”,如在实际的gzip文件中可以找到的那样。这段代码只是为了举例说明。

更详细的多处理示例

下面的工作示例说明如何使用配置文件将日志记录与多处理一起使用。这些配置相当简单,但可以用来说明在真正的多处理场景中如何实现更复杂的配置。

在本例中,主进程生成一个侦听器进程和一些工作进程。每个主进程、侦听器和工作进程都有三个独立的配置(工作进程共享相同的配置)。我们可以看到在主进程中进行日志记录,工人如何登录到队列处理程序,以及侦听器如何实现QueueListener和更复杂的日志记录配置,以及如何安排将通过队列接收的事件分派到配置中指定的处理程序。请注意,这些配置纯粹是说明性的,但是您应该能够使这个示例适应您自己的场景。

这是脚本-docstrings和注释希望能解释它是如何工作的:

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time

class MyHandler:
    """
    A simple handler for logging events. It runs in the listener process and
    dispatches events to loggers based on the name in the received record,
    which then get dispatched, by the logging system, to the handlers
    configured for those loggers.
    """

    def handle(self, record):
        if record.name == "root":
            logger = logging.getLogger()
        else:
            logger = logging.getLogger(record.name)

        if logger.isEnabledFor(record.levelno):
            # The process name is transformed just to show that it's the listener
            # doing the logging to files and console
            record.processName = '%s (for %s)' % (current_process().name, record.processName)
            logger.handle(record)

def listener_process(q, stop_event, config):
    """
    This could be done in the main process, but is just done in a separate
    process for illustrative purposes.

    This initialises logging according to the specified configuration,
    starts the listener and waits for the main process to signal completion
    via the event. The listener is then stopped, and the process exits.
    """
    logging.config.dictConfig(config)
    listener = logging.handlers.QueueListener(q, MyHandler())
    listener.start()
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    stop_event.wait()
    listener.stop()

def worker_process(config):
    """
    A number of these are spawned for the purpose of illustration. In
    practice, they could be a heterogeneous bunch of processes rather than
    ones which are identical to each other.

    This initialises logging according to the specified configuration,
    and logs a hundred messages with random levels to randomly selected
    loggers.

    A small sleep is added to allow other processes a chance to run. This
    is not strictly needed, but it mixes the output from the different
    processes a bit more than if it's left out.
    """
    logging.config.dictConfig(config)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)
        time.sleep(0.01)

def main():
    q = Queue()
    # The main process gets a simple configuration which prints to the console.
    config_initial = {
        'version': 1,
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO'
            }
        },
        'root': {
            'handlers': ['console'],
            'level': 'DEBUG'
        }
    }
    # The worker process configuration is just a QueueHandler attached to the
    # root logger, which allows all messages to be sent to the queue.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_worker = {
        'version': 1,
        'disable_existing_loggers': True,
        'handlers': {
            'queue': {
                'class': 'logging.handlers.QueueHandler',
                'queue': q
            }
        },
        'root': {
            'handlers': ['queue'],
            'level': 'DEBUG'
        }
    }
    # The listener process configuration shows that the full flexibility of
    # logging configuration is available to dispatch events to handlers however
    # you want.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_listener = {
        'version': 1,
        'disable_existing_loggers': True,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            },
            'simple': {
                'class': 'logging.Formatter',
                'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'formatter': 'simple',
                'level': 'INFO'
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed'
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed'
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'formatter': 'detailed',
                'level': 'ERROR'
            }
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'handlers': ['console', 'file', 'errors'],
            'level': 'DEBUG'
        }
    }
    # Log some initial events, just to show that logging in the parent works
    # normally.
    logging.config.dictConfig(config_initial)
    logger = logging.getLogger('setup')
    logger.info('About to create workers ...')
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1),
                     args=(config_worker,))
        workers.append(wp)
        wp.start()
        logger.info('Started worker: %s', wp.name)
    logger.info('About to create listener ...')
    stop_event = Event()
    lp = Process(target=listener_process, name='listener',
                 args=(q, stop_event, config_listener))
    lp.start()
    logger.info('Started listener')
    # We now hang around for the workers to finish their work.
    for wp in workers:
        wp.join()
    # Workers all done, listening can now stop.
    # Logging in the parent still works normally.
    logger.info('Telling listener to stop ...')
    stop_event.set()
    lp.join()
    logger.info('All done.')

if __name__ == '__main__':
    main()

将BOM插入发送到sysloghandler的消息中

RFC 5424 要求将Unicode消息作为一组具有以下结构的字节发送到syslog守护进程:可选的纯ASCII组件,后跟一个utf-8字节顺序标记(bom),后跟一个使用utf-8编码的Unicode。(见 relevant section of the specification

在python 3.1中,代码被添加到 SysLogHandler 在消息中插入一个bom,但不幸的是,它被错误地实现了,bom出现在消息的开头,因此不允许任何纯ASCII组件出现在它之前。

由于这种行为被破坏,不正确的BOM插入代码将从Python3.2.4及更高版本中删除。但是,它没有被替换,如果你想生产 RFC 5424 -兼容消息,其中包括一个BOM、一个可选的前面的纯ASCII序列和后面的任意Unicode,使用UTF-8编码,然后需要执行以下操作:

  1. 附A Formatter 您的实例 SysLogHandler 实例,格式字符串如下:

    'ASCII section\ufeffUnicode section'
    

    当使用UTF-8编码时,Unicode代码点U+FEFF将编码为UTF-8 BOM——字节字符串 b'\xef\xbb\xbf' .

  2. 用您类似于的任何占位符替换ascii部分,但请确保替换后出现在其中的数据始终是ascii(这样,在utf-8编码之后它将保持不变)。

  3. 用您类似于的任何占位符替换unicode节;如果替换后出现的数据包含ASCII范围之外的字符,那就好了——它将使用utf-8进行编码。

格式化消息 will 使用UTF-8编码 SysLogHandler . 如果你遵守上述规则,你应该能够 RFC 5424 -符合要求的消息。如果不这样做,日志记录可能不会抱怨,但是您的消息将不符合RFC5424,并且您的系统日志守护进程可能会抱怨。

实现结构化日志记录

尽管大多数日志消息都是供人阅读的,因此不容易被机器解析,但在某些情况下,您可能希望以结构化格式输出消息,而 is 能够被程序解析(不需要复杂的正则表达式来解析日志消息)。使用日志记录包很容易实现这一点。有许多方法可以实现这一点,但下面是一种简单的方法,它使用JSON以机器可解析的方式序列化事件:

import json
import logging

class StructuredMessage:
    def __init__(self, message, /, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        return '%s >>> %s' % (self.message, json.dumps(self.kwargs))

_ = StructuredMessage   # optional, to improve readability

logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))

如果运行上述脚本,它将打印:

message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}

请注意,根据所使用的Python版本,项目的顺序可能不同。

如果需要更专业的处理,可以使用自定义JSON编码器,如以下完整示例所示:

from __future__ import unicode_literals

import json
import logging

# This next bit is to ensure the script runs unchanged on 2.x and 3.x
try:
    unicode
except NameError:
    unicode = str

class Encoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, set):
            return tuple(o)
        elif isinstance(o, unicode):
            return o.encode('unicode_escape').decode('ascii')
        return super(Encoder, self).default(o)

class StructuredMessage:
    def __init__(self, message, /, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        s = Encoder().encode(self.kwargs)
        return '%s >>> %s' % (self.message, s)

_ = StructuredMessage   # optional, to improve readability

def main():
    logging.basicConfig(level=logging.INFO, format='%(message)s')
    logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))

if __name__ == '__main__':
    main()

运行上述脚本时,将打印:

message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}

请注意,根据所使用的Python版本,项目的顺序可能不同。

自定义处理程序 dictConfig()

有时您希望以特定的方式自定义日志记录处理程序,如果您使用 dictConfig() 您可以这样做而不需要子类化。例如,考虑您可能需要设置日志文件的所有权。在posix上,使用 shutil.chown() 但stdlib中的文件处理程序不提供内置支持。您可以使用如下简单函数自定义处理程序创建:

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

然后,可以在传递给的日志配置中指定 dictConfig() ,通过调用以下函数创建日志处理程序:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

在这个例子中,我使用 pulse 用户和组,仅用于说明目的。把它组合成一个工作脚本, chowntest.py ::

import logging, logging.config, os, shutil

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')

要运行此,您可能需要以 root

$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log

注意,这个例子使用了python 3.3,因为这里 shutil.chown() 出现。这种方法应该适用于任何支持 dictConfig() -也就是说,python 2.7、3.2或更高版本。对于3.3之前的版本,您需要使用例如 os.chown() .

实际上,创建函数的处理程序可能位于项目中某个实用程序模块中。而不是配置中的行:

'()': owned_file_handler,

您可以使用例如:

'()': 'ext://project.util.owned_file_handler',

在哪里? project.util 可以用函数所在包的实际名称替换。在上面的工作脚本中,使用 'ext://__main__.owned_file_handler' 应该工作。这里,实际的可调用性通过 dictConfig()ext:// 规范。

这个例子还希望指出如何实现其他类型的文件更改(例如,设置特定的POSIX权限位)的方法,使用 os.chmod() .

当然,该方法也可以扩展到除 FileHandler -例如,一个旋转文件处理程序,或者是一个完全不同类型的处理程序。

在整个应用程序中使用特定的格式样式

在python 3.2中, Formatter 获得了 style 关键字参数,默认为 % 为了向后兼容,允许 {{$ 支持 str.format()string.Template . 请注意,这控制日志消息的格式,以便最终输出到日志,并且与单个日志消息的构造方式完全正交。

日志调用 (debug()info() 等等)只为实际日志消息本身获取位置参数,关键字参数仅用于确定如何处理日志调用的选项(例如 exc_info 关键字参数,指示应记录追溯信息,或 extra 关键字参数,指示要添加到日志中的其他上下文信息)。因此,您不能使用 str.format()string.Template 语法,因为日志记录包在内部使用了%格式来合并格式字符串和变量参数。在保持向后兼容性的同时不会改变这一点,因为现有代码中的所有日志记录调用都将使用%格式的字符串。

有人建议将格式样式与特定的记录器关联起来,但这种方法也会遇到向后兼容性问题,因为任何现有的代码都可能使用给定的记录器名称并使用%格式。

为了使日志记录在任何第三方库和代码之间可以互操作,需要在单个日志记录调用的级别上决定格式化。这将打开两种方式,其中可以容纳可选的格式样式。

使用日志记录工厂

在python 3.2中,以及 Formatter 上面提到的更改,日志记录包获得了允许用户设置自己的 LogRecord 子类,使用 setLogRecordFactory() 功能。您可以使用它来设置您自己的子类 LogRecord ,它通过重写 getMessage() 方法。此方法的基类实现是 msg % args 格式化会发生,并且您可以在其中替换替代格式化;但是,您应该小心地支持所有格式化样式,并允许默认的%格式化,以确保与其他代码的互操作性。还应注意调用 str(self.msg) 和基本实现一样。

参考参考文件 setLogRecordFactory()LogRecord 更多信息。

使用自定义消息对象

还有另一种可能更简单的方法,您可以使用和$-格式来构造您的单个日志消息。你可以回忆起 将任意对象用作消息 )日志记录时,可以使用任意对象作为消息格式字符串,日志记录包将调用 str() 在该对象上获取实际的格式字符串。考虑以下两个类:

class BraceMessage:
    def __init__(self, fmt, /, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, /, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

其中任何一个都可以用来代替格式字符串,以允许使用-或$-格式来构建实际的“消息”部分,该部分显示在格式化的日志输出中,而不是“%(message)s”或“message”或“$message”。如果您发现在任何时候想要记录一些东西时使用类名有点不方便,那么如果您使用别名,例如 M_ 为了信息(或者也许 __ ,如果您正在使用 _ 用于本地化)。

下面给出了这种方法的示例。首先,格式设置为 str.format() ::

>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)

第二,格式设置为 string.Template ::

>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

需要注意的一点是,使用这种方法,您不需要支付显著的性能损失:实际的格式化不是在您进行日志调用时发生的,而是当(如果)日志消息实际上将由处理程序输出到日志时发生的。所以,唯一可能让你感到困惑的不寻常的事情是,括号围绕着格式字符串和参数,而不仅仅是格式字符串。那是因为 __ 符号只是对一个构造函数调用 XXXMessage 上面显示的类。

配置过滤器 dictConfig()

can 使用配置筛选器 dictConfig() 尽管乍一看它可能并不明显,但如何做(因此这个秘诀)。自从 Filter 是标准库中包含的唯一筛选器类,它不太可能满足许多要求(它只是作为基类存在),您通常需要定义自己的 Filter 具有重写的子类 filter() 方法。为此,请指定 () 在筛选器的配置字典中键入,指定将用于创建筛选器的可调用项(类是最明显的,但可以提供返回 Filter 实例)。下面是一个完整的例子:

import logging
import logging.config
import sys

class MyFilter(logging.Filter):
    def __init__(self, param=None):
        self.param = param

    def filter(self, record):
        if self.param is None:
            allow = True
        else:
            allow = self.param not in record.msg
        if allow:
            record.msg = 'changed: ' + record.msg
        return allow

LOGGING = {
    'version': 1,
    'filters': {
        'myfilter': {
            '()': MyFilter,
            'param': 'noshow',
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'filters': ['myfilter']
        }
    },
    'root': {
        'level': 'DEBUG',
        'handlers': ['console']
    },
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.debug('hello')
    logging.debug('hello - noshow')

此示例显示如何以关键字参数的形式将配置数据传递给构造实例的可调用文件。运行时,上面的脚本将打印:

changed: hello

这表明过滤器按配置工作。

需要注意的几点:

  • 如果在配置中不能直接引用可调用(例如,如果它位于不同的模块中,并且不能直接导入到配置字典所在的位置),则可以使用该表单 ext://... 如上所述 访问外部对象 . 例如,您可以使用文本 'ext://__main__.MyFilter' 而不是 MyFilter 在上面的例子中。

  • 除了过滤器之外,此技术还可以用于配置自定义处理程序和格式化程序。见 用户定义的对象 有关日志如何支持在配置中使用用户定义的对象的详细信息,请参阅其他秘诀。 自定义处理程序 dictConfig() 上面。

自定义异常格式

有时,您可能希望进行自定义的异常格式化——为了参数的缘故,假设每个记录的事件只需要一行,即使异常信息存在。您可以使用自定义格式化程序类执行此操作,如下面的示例所示:

import logging

class OneLineExceptionFormatter(logging.Formatter):
    def formatException(self, exc_info):
        """
        Format an exception so that it prints on a single line.
        """
        result = super(OneLineExceptionFormatter, self).formatException(exc_info)
        return repr(result)  # or format into one line however you want to

    def format(self, record):
        s = super(OneLineExceptionFormatter, self).format(record)
        if record.exc_text:
            s = s.replace('\n', '') + '|'
        return s

def configure_logging():
    fh = logging.FileHandler('output.txt', 'w')
    f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
                                  '%d/%m/%Y %H:%M:%S')
    fh.setFormatter(f)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(fh)

def main():
    configure_logging()
    logging.info('Sample message')
    try:
        x = 1 / 0
    except ZeroDivisionError as e:
        logging.exception('ZeroDivisionError: %s', e)

if __name__ == '__main__':
    main()

运行时,这将生成一个只有两行的文件:

28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n  File "logtest7.py", line 30, in main\n    x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|

虽然上面的处理很简单,但它指出了如何根据您的喜好格式化异常信息。这个 traceback 模块可能有助于更专业的需求。

通话记录信息

在某些情况下,可能需要以声音而不是可见格式呈现日志消息。如果您的系统中有文本到语音(TTS)功能,即使它没有Python绑定,也很容易做到这一点。大多数TTS系统都有一个可以运行的命令行程序,可以使用 subprocess . 这里假设TTS命令行程序不会期望与用户交互或需要很长时间才能完成,并且记录的消息的频率不会太高,以至于用消息淹没用户,并且一次只说一条消息是可以接受的,下面的示例实现将等待或者在处理下一个消息之前要先发出一条消息,这可能会导致其他处理程序一直等待。下面是一个简短的示例,显示了该方法,它假定 espeak TTS包可用:

import logging
import subprocess
import sys

class TTSHandler(logging.Handler):
    def emit(self, record):
        msg = self.format(record)
        # Speak slowly in a female English voice
        cmd = ['espeak', '-s150', '-ven+f3', msg]
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)
        # wait for the program to finish
        p.communicate()

def configure_logging():
    h = TTSHandler()
    root = logging.getLogger()
    root.addHandler(h)
    # the default formatter just returns the message
    root.setLevel(logging.DEBUG)

def main():
    logging.info('Hello')
    logging.debug('Goodbye')

if __name__ == '__main__':
    configure_logging()
    sys.exit(main())

当运行时,这个脚本应该用女性的声音说“你好”,然后说“再见”。

当然,上述方法可以适用于其他TTS系统,甚至其他可以通过从命令行运行的外部程序处理消息的系统。

缓冲日志消息并有条件地输出它们

在某些情况下,您可能希望在临时区域中记录消息,并且仅在出现特定情况时输出它们。例如,您可能希望开始在函数中记录调试事件,如果函数完成时没有错误,则不希望将日志与收集的调试信息混淆,但如果存在错误,则希望输出所有调试信息以及错误。

下面是一个示例,演示了如何使用函数的修饰器来实现这一点,您希望日志记录的行为是这样的。它利用了 logging.handlers.MemoryHandler ,允许缓冲记录的事件,直到出现某些情况,此时缓冲的事件是 flushed -传递给另一个处理程序 target 处理程序)。默认情况下, MemoryHandler 当缓冲区被填满或看到级别大于或等于指定阈值的事件时刷新。您可以将此配方与更专业的子类 MemoryHandler 如果需要自定义刷新行为。

示例脚本有一个简单的函数, foo ,它只在所有日志级别中循环,写入 sys.stderr 要说它要在哪个级别登录,然后实际在那个级别记录消息。可以将参数传递给 foo 如果为真,它将以错误和关键级别记录—否则,它只在调试、信息和警告级别记录。

剧本只是安排装饰 foo 使用一个可以执行所需条件日志记录的修饰器。decorator将一个记录器作为参数,并在调用修饰函数期间附加一个内存处理程序。另外,可以使用目标处理程序、发生刷新的级别和缓冲区的容量(缓冲的记录数)对装饰器进行参数化。这些默认值为 StreamHandler 它写给 sys.stderrlogging.ERROR100 分别。

剧本如下:

import logging
from logging.handlers import MemoryHandler
import sys

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
    if target_handler is None:
        target_handler = logging.StreamHandler()
    if flush_level is None:
        flush_level = logging.ERROR
    if capacity is None:
        capacity = 100
    handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)

    def decorator(fn):
        def wrapper(*args, **kwargs):
            logger.addHandler(handler)
            try:
                return fn(*args, **kwargs)
            except Exception:
                logger.exception('call failed')
                raise
            finally:
                super(MemoryHandler, handler).flush()
                logger.removeHandler(handler)
        return wrapper

    return decorator

def write_line(s):
    sys.stderr.write('%s\n' % s)

def foo(fail=False):
    write_line('about to log at DEBUG ...')
    logger.debug('Actually logged at DEBUG')
    write_line('about to log at INFO ...')
    logger.info('Actually logged at INFO')
    write_line('about to log at WARNING ...')
    logger.warning('Actually logged at WARNING')
    if fail:
        write_line('about to log at ERROR ...')
        logger.error('Actually logged at ERROR')
        write_line('about to log at CRITICAL ...')
        logger.critical('Actually logged at CRITICAL')
    return fail

decorated_foo = log_if_errors(logger)(foo)

if __name__ == '__main__':
    logger.setLevel(logging.DEBUG)
    write_line('Calling undecorated foo with False')
    assert not foo(False)
    write_line('Calling undecorated foo with True')
    assert foo(True)
    write_line('Calling decorated foo with False')
    assert not decorated_foo(False)
    write_line('Calling decorated foo with True')
    assert decorated_foo(True)

运行此脚本时,应观察以下输出:

Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL

如您所见,实际日志记录输出仅在记录严重性为错误或更高的事件时发生,但在这种情况下,还记录以前严重性较低的任何事件。

当然,你可以使用传统的装饰方式:

@log_if_errors(logger)
def foo(fail=False):
    ...

通过配置使用UTC(GMT)格式化时间

有时,您希望使用UTC格式化时间,可以使用类(如 UTCFormatter ,如下所示:

import logging
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

然后你可以使用 UTCFormatter 在您的代码中而不是 Formatter . 如果您想通过配置来实现这一点,可以使用 dictConfig() 具有以下完整示例所示方法的API::

import logging
import logging.config
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'utc': {
            '()': UTCFormatter,
            'format': '%(asctime)s %(message)s',
        },
        'local': {
            'format': '%(asctime)s %(message)s',
        }
    },
    'handlers': {
        'console1': {
            'class': 'logging.StreamHandler',
            'formatter': 'utc',
        },
        'console2': {
            'class': 'logging.StreamHandler',
            'formatter': 'local',
        },
    },
    'root': {
        'handlers': ['console1', 'console2'],
   }
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.warning('The local time is %s', time.asctime())

运行此脚本时,它应打印如下内容:

2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015

显示如何将时间格式化为本地时间和UTC,每个处理程序一个。

使用上下文管理器进行选择性日志记录

有时,临时更改日志配置并在执行某项操作后将其还原会很有用。为此,上下文管理器是保存和恢复日志上下文的最明显的方法。下面是这样一个上下文管理器的简单示例,它允许您有选择地更改日志级别,并仅在上下文管理器的范围内添加日志处理程序:

import logging
import sys

class LoggingContext:
    def __init__(self, logger, level=None, handler=None, close=True):
        self.logger = logger
        self.level = level
        self.handler = handler
        self.close = close

    def __enter__(self):
        if self.level is not None:
            self.old_level = self.logger.level
            self.logger.setLevel(self.level)
        if self.handler:
            self.logger.addHandler(self.handler)

    def __exit__(self, et, ev, tb):
        if self.level is not None:
            self.logger.setLevel(self.old_level)
        if self.handler:
            self.logger.removeHandler(self.handler)
        if self.handler and self.close:
            self.handler.close()
        # implicit return of None => don't swallow exceptions

如果指定级别值,则在上下文管理器覆盖的WITH块的范围内,记录器的级别将设置为该值。如果指定一个处理程序,它将在块的条目上添加到记录器中,并在块的出口上删除。您还可以要求管理器在块退出时为您关闭处理程序-如果不再需要该处理程序,您可以这样做。

为了说明它是如何工作的,我们可以向上面添加以下代码块:

if __name__ == '__main__':
    logger = logging.getLogger('foo')
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.INFO)
    logger.info('1. This should appear just once on stderr.')
    logger.debug('2. This should not appear.')
    with LoggingContext(logger, level=logging.DEBUG):
        logger.debug('3. This should appear once on stderr.')
    logger.debug('4. This should not appear.')
    h = logging.StreamHandler(sys.stdout)
    with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
        logger.debug('5. This should appear twice - once on stderr and once on stdout.')
    logger.info('6. This should appear just once on stderr.')
    logger.debug('7. This should not appear.')

我们最初将记录器的级别设置为 INFO ,因此消息1出现,消息2不出现。然后我们将级别更改为 DEBUG 暂时在下面 with 阻止,因此出现消息3。块退出后,记录器的级别将恢复为 INFO 所以消息4不会出现。在下一个 with 阻止,我们将级别设置为 DEBUG 再次添加一个处理程序 sys.stdout .因此,消息5在控制台上出现两次(一次通过 stderr 一次通过 stdout )后 with 语句完成后,状态与之前一样,因此消息6出现(如消息1),而消息7不出现(如消息2)。

如果我们运行结果脚本,结果如下:

$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

如果我们再次运行它,但是管道 stderr/dev/null ,我们看到以下消息,这是唯一写入的消息 stdout

$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.

再一次,但是管道 stdout/dev/null 我们得到:

$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

在这种情况下,消息5打印到 stdout 未按预期显示。

当然,这里描述的方法可以概括,例如临时附加日志过滤器。注意,上面的代码在python 2和python 3中都有效。

CLI应用程序启动程序模板

下面是一个示例,演示了如何:

  • 使用基于命令行参数的日志记录级别

  • 在单独的文件中分派到多个子命令,所有日志都以一致的方式在同一级别上记录。

  • 使用简单、最少的配置

假设我们有一个命令行应用程序,其任务是停止、启动或重新启动某些服务。为了便于说明,可以将其组织成一个文件。 app.py 这是应用程序的主脚本,在 start.pystop.pyrestart.py . 进一步假设我们希望通过命令行参数控制应用程序的冗长性,默认为 logging.INFO . 这是一种方法 app.py 可以这样写:

import argparse
import importlib
import logging
import os
import sys

def main(args=None):
    scriptname = os.path.basename(__file__)
    parser = argparse.ArgumentParser(scriptname)
    levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
    parser.add_argument('--log-level', default='INFO', choices=levels)
    subparsers = parser.add_subparsers(dest='command',
                                       help='Available commands:')
    start_cmd = subparsers.add_parser('start', help='Start a service')
    start_cmd.add_argument('name', metavar='NAME',
                           help='Name of service to start')
    stop_cmd = subparsers.add_parser('stop',
                                     help='Stop one or more services')
    stop_cmd.add_argument('names', metavar='NAME', nargs='+',
                          help='Name of service to stop')
    restart_cmd = subparsers.add_parser('restart',
                                        help='Restart one or more services')
    restart_cmd.add_argument('names', metavar='NAME', nargs='+',
                             help='Name of service to restart')
    options = parser.parse_args()
    # the code to dispatch commands could all be in this file. For the purposes
    # of illustration only, we implement each command in a separate module.
    try:
        mod = importlib.import_module(options.command)
        cmd = getattr(mod, 'command')
    except (ImportError, AttributeError):
        print('Unable to find the code for command \'%s\'' % options.command)
        return 1
    # Could get fancy here and load configuration from file or dictionary
    logging.basicConfig(level=options.log_level,
                        format='%(levelname)s %(name)s %(message)s')
    cmd(options)

if __name__ == '__main__':
    sys.exit(main())

以及 startstoprestart 命令可以在单独的模块中实现,例如用于启动:

# start.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    logger.debug('About to start %s', options.name)
    # actually do the command processing here ...
    logger.info('Started the \'%s\' service.', options.name)

因此停车:

# stop.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '\'%s\'' % options.names[0]
    else:
        plural = 's'
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services = services[:i] + ' and ' + services[i + 2:]
    logger.debug('About to stop %s', services)
    # actually do the command processing here ...
    logger.info('Stopped the %s service%s.', services, plural)

同样,对于重新启动:

# restart.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '\'%s\'' % options.names[0]
    else:
        plural = 's'
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services = services[:i] + ' and ' + services[i + 2:]
    logger.debug('About to restart %s', services)
    # actually do the command processing here ...
    logger.info('Restarted the %s service%s.', services, plural)

如果我们以默认日志级别运行此应用程序,则会得到如下输出:

$ python app.py start foo
INFO start Started the 'foo' service.

$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

第一个字是日志级别,第二个字是事件记录位置的模块或包名称。

如果我们更改日志级别,那么我们可以更改发送到日志的信息。例如,如果我们需要更多信息:

$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.

$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

如果我们想要更少:

$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz

在这种情况下,命令不会将任何内容打印到控制台,因为 WARNING 级别或更高级别由他们记录。

用于日志记录的Qt GUI

一个时不时出现的问题是如何登录到GUI应用程序。这个 Qt 框架是一个流行的跨平台UI框架,使用Python绑定 PySide2PyQt5 类库。

下面的示例演示如何登录到Qt GUI。这引入了一个简单的 QtHandler 类,它接受一个可调用的,它应该是主线程中执行GUI更新的插槽。还创建了一个工作线程,以显示如何从UI本身(通过手动日志记录按钮)以及在后台工作的工作线程(这里,只是以随机级别记录消息,中间有随机的短延迟)登录到GUI。

工作线程使用Qt的 QThread 类而不是 threading 模块,因为在某些情况下必须使用 QThread ,它提供了与其他 Qt 组件。

该代码应该与 PySide2PyQt5 . 您应该能够使该方法适应Qt的早期版本。有关详细信息,请参阅代码段中的注释。

import datetime
import logging
import random
import sys
import time

# Deal with minor differences between PySide2 and PyQt5
try:
    from PySide2 import QtCore, QtGui, QtWidgets
    Signal = QtCore.Signal
    Slot = QtCore.Slot
except ImportError:
    from PyQt5 import QtCore, QtGui, QtWidgets
    Signal = QtCore.pyqtSignal
    Slot = QtCore.pyqtSlot


logger = logging.getLogger(__name__)


#
# Signals need to be contained in a QObject or subclass in order to be correctly
# initialized.
#
class Signaller(QtCore.QObject):
    signal = Signal(str, logging.LogRecord)

#
# Output to a Qt GUI is only supposed to happen on the main thread. So, this
# handler is designed to take a slot function which is set up to run in the main
# thread. In this example, the function takes a string argument which is a
# formatted log message, and the log record which generated it. The formatted
# string is just a convenience - you could format a string for output any way
# you like in the slot function itself.
#
# You specify the slot function to do whatever GUI updates you want. The handler
# doesn't know or care about specific UI elements.
#
class QtHandler(logging.Handler):
    def __init__(self, slotfunc, *args, **kwargs):
        super(QtHandler, self).__init__(*args, **kwargs)
        self.signaller = Signaller()
        self.signaller.signal.connect(slotfunc)

    def emit(self, record):
        s = self.format(record)
        self.signaller.signal.emit(s, record)

#
# This example uses QThreads, which means that the threads at the Python level
# are named something like "Dummy-1". The function below gets the Qt name of the
# current thread.
#
def ctname():
    return QtCore.QThread.currentThread().objectName()


#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
          logging.CRITICAL)

#
# This worker class represents work that is done in a thread separate to the
# main thread. The way the thread is kicked off to do work is via a button press
# that connects to a slot in the worker.
#
# Because the default threadName value in the LogRecord isn't much use, we add
# a qThreadName which contains the QThread name as computed above, and pass that
# value in an "extra" dictionary which is used to update the LogRecord with the
# QThread name.
#
# This example worker just outputs messages sequentially, interspersed with
# random delays of the order of a few seconds.
#
class Worker(QtCore.QObject):
    @Slot()
    def start(self):
        extra = {'qThreadName': ctname() }
        logger.debug('Started work', extra=extra)
        i = 1
        # Let the thread run until interrupted. This allows reasonably clean
        # thread termination.
        while not QtCore.QThread.currentThread().isInterruptionRequested():
            delay = 0.5 + random.random() * 2
            time.sleep(delay)
            level = random.choice(LEVELS)
            logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
            i += 1

#
# Implement a simple UI for this cookbook example. This contains:
#
# * A read-only text edit window which holds formatted log messages
# * A button to start work and log stuff in a separate thread
# * A button to log something from the main thread
# * A button to clear the log window
#
class Window(QtWidgets.QWidget):

    COLORS = {
        logging.DEBUG: 'black',
        logging.INFO: 'blue',
        logging.WARNING: 'orange',
        logging.ERROR: 'red',
        logging.CRITICAL: 'purple',
    }

    def __init__(self, app):
        super(Window, self).__init__()
        self.app = app
        self.textedit = te = QtWidgets.QPlainTextEdit(self)
        # Set whatever the default monospace font is for the platform
        f = QtGui.QFont('nosuchfont')
        f.setStyleHint(f.Monospace)
        te.setFont(f)
        te.setReadOnly(True)
        PB = QtWidgets.QPushButton
        self.work_button = PB('Start background work', self)
        self.log_button = PB('Log a message at a random level', self)
        self.clear_button = PB('Clear log window', self)
        self.handler = h = QtHandler(self.update_status)
        # Remember to use qThreadName rather than threadName in the format string.
        fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
        formatter = logging.Formatter(fs)
        h.setFormatter(formatter)
        logger.addHandler(h)
        # Set up to terminate the QThread when we exit
        app.aboutToQuit.connect(self.force_quit)

        # Lay out all the widgets
        layout = QtWidgets.QVBoxLayout(self)
        layout.addWidget(te)
        layout.addWidget(self.work_button)
        layout.addWidget(self.log_button)
        layout.addWidget(self.clear_button)
        self.setFixedSize(900, 400)

        # Connect the non-worker slots and signals
        self.log_button.clicked.connect(self.manual_update)
        self.clear_button.clicked.connect(self.clear_display)

        # Start a new worker thread and connect the slots for the worker
        self.start_thread()
        self.work_button.clicked.connect(self.worker.start)
        # Once started, the button should be disabled
        self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))

    def start_thread(self):
        self.worker = Worker()
        self.worker_thread = QtCore.QThread()
        self.worker.setObjectName('Worker')
        self.worker_thread.setObjectName('WorkerThread')  # for qThreadName
        self.worker.moveToThread(self.worker_thread)
        # This will start an event loop in the worker thread
        self.worker_thread.start()

    def kill_thread(self):
        # Just tell the worker to stop, then tell it to quit and wait for that
        # to happen
        self.worker_thread.requestInterruption()
        if self.worker_thread.isRunning():
            self.worker_thread.quit()
            self.worker_thread.wait()
        else:
            print('worker has already exited.')

    def force_quit(self):
        # For use when the window is closed
        if self.worker_thread.isRunning():
            self.kill_thread()

    # The functions below update the UI and run in the main thread because
    # that's where the slots are set up

    @Slot(str, logging.LogRecord)
    def update_status(self, status, record):
        color = self.COLORS.get(record.levelno, 'black')
        s = '<pre><font color="%s">%s</font></pre>' % (color, status)
        self.textedit.appendHtml(s)

    @Slot()
    def manual_update(self):
        # This function uses the formatted message passed in, but also uses
        # information from the record to format the message in an appropriate
        # color according to its severity (level).
        level = random.choice(LEVELS)
        extra = {'qThreadName': ctname() }
        logger.log(level, 'Manually logged!', extra=extra)

    @Slot()
    def clear_display(self):
        self.textedit.clear()


def main():
    QtCore.QThread.currentThread().setObjectName('MainThread')
    logging.getLogger().setLevel(logging.DEBUG)
    app = QtWidgets.QApplication(sys.argv)
    example = Window(app)
    example.show()
    sys.exit(app.exec_())

if __name__=='__main__':
    main()