首页 > 技术文章 > 在Python 多进程并行化中使用日志基于logging

freedom-wan 2020-08-15 16:37 原文

logging 是线程安全而不是进程安全的,一个比较方便可行的操作就是使用一个listen进程监听别的进程产生的log而后用queue进行通讯
下面是一个例子

import logging
# 这个handler 模块的import不一样因为对于有些模块来说submodule不会自动导入的这个问题主要来源于module 的__init__.py
# https://stackoverflow.com/a/3781554/10588903(参考这个链接)
import logging.handlers
import multiprocessing
from random import choice, random
import time


# listener的配置这个可以修改
def listener_configurer():
    # return logger object
    root = logging.getLogger()
    h = logging.FileHandler('mptest.log', 'a')
    f = logging.Formatter('%(asctime)s %(processName)-100s %(name)s %(levelname)-80s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord. 这个不建议修改
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
    configurer(queue)
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)
    print('Worker finished: %s' % name)

# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
    # Constructor for a FIFO queue. 
    # maxsize is an integer that sets the upperbound 
    # limit on the number of items that can be placed in the queue. 
    # Insertion will block once this size has been reached, until queue items are consumed. 
    # If maxsize is less than or equal to zero, the queue size is infinite.
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                         args=(queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()

if __name__ == '__main__':
    main()

推荐阅读