首页 > 解决方案 > 如何缓冲每个工作进程的日志并在进程完成时刷新它

问题描述

这是我到目前为止所拥有的:

import logging, logging.handlers
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager


def worker_process(q):

    qh = logging.handlers.QueueHandler(q)
    memoryhandler = logging.handlers.MemoryHandler(
        10000,
        logging.CRITICAL,
        target=qh,
        flushOnClose=True
    )
    
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(memoryhandler)

    for i in range(4):
        logger = logging.getLogger()
        logger.log(logging.ERROR, 'Message no. %d', i)
    
    memoryhandler.close()


if __name__ == '__main__':
    q = Manager().Queue()

    with ProcessPoolExecutor(max_workers=3) as executor:
        for i in range(4):
            executor.submit(worker_process, q)

    logger = logging.getLogger()
    while True:
        record = q.get()
        logger.handle(record)

根据 MemoryHandler 文档,我希望我的 memoryhandler 缓冲日志记录并在memoryhandler.close()调用时将它们一次全部刷新到队列中。

但是,这不是这里发生的事情:

Message no. 0
Message no. 0
Message no. 1
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 1
Message no. 2
Message no. 2
Message no. 3
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3

为什么会这样?

有没有办法使用日志库获得以下输出?

Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3

标签: pythonloggingmultiprocessing

解决方案


问题是您忘记了这是一个多处理应用程序,并且您的进程正在并行写入传递的队列,因此除了最后 4 条记录之外,日志记录将按照您看到的任意顺序(因为您只有 3池中的进程和第 4 次submit调用实际上不会运行,直到前 3次submit调用终止,因此将全部自行运行)。

还有一个额外的问题是你的主进程永远不会因为你的while True:循环而终止。这可以通过让每个任务在实际日志记录之后将额外的哨兵记录写入队列(例如)来解决。None然后主进程可以循环,直到它看到这些标记记录中的 4 个,当然,它会忽略这些记录。

解决方案是使用一个在被调用MemoryHandler之前不会刷新记录的close正则(并确保无论正在写入的记录的日志记录级别是什么都不会发生这种情况,我们可以子类化这个类并覆盖该shouldFlush方法)但是然后确保在 a 的控制下刷新所有记录,Lock以便一次只刷新一个进程:

import logging, logging.handlers
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Lock, cpu_count
import sys

class MyMemoryHandler(logging.handlers.MemoryHandler):
    def shouldFlush(self, record):
        """
        Check only for the buffer full.
        """
        return (len(self.buffer) >= self.capacity)

def init_pool(the_lock):
    global lock
    lock = the_lock

def worker_process():
    memoryhandler = MyMemoryHandler(
        10000,
        logging.CRITICAL,
        target=logging.StreamHandler(sys.stderr),
        flushOnClose=True
    )

    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(memoryhandler)

    for i in range(4):
        logger = logging.getLogger()
        # You can even write out CRITICAL level records now without flushing occurring:
        logger.log(logging.CRITICAL, 'Message no. %d', i)

    with lock:
        memoryhandler.close()


if __name__ == '__main__':
    lock = Lock()
    # Note we are now using a pool size of 4
    with ProcessPoolExecutor(max_workers=4, initializer=init_pool, initargs=(lock,)) as executor:
        for i in range(4):
            executor.submit(worker_process)

印刷:

Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3

如果您想像以前一样使用队列而不是无限循环,那么您可以使用修改如下的代码。请注意,我已经用一个实例替换了一个托管队列,它的性能要好得多。multiprocessing.Queue但是,为了确保没有死锁,主进程不能等到写入进程完成后才从队列中读取所有消息,因为如果没有读取器,写入进程可以阻止将记录写入队列。这就是为什么我移动了从with ProcessPoolExecutor(...) as executor:块内的队列中读取记录的代码。我还通过初始化池的全局q变量中的每个进程使队列实例可用于工作函数(它不能作为参数传递,否则在处理处理池时一切都会挂起)。

import logging, logging.handlers
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue, Lock, cpu_count
import sys

class MyMemoryHandler(logging.handlers.MemoryHandler):
    def shouldFlush(self, record):
        """
        Check only for the buffer full.
        """
        return (len(self.buffer) >= self.capacity)

def init_pool(the_lock, the_queue):
    global lock, q
    lock = the_lock
    q = the_queue

SENTINEL = None

def worker_process():

    qh = logging.handlers.QueueHandler(q)
    memoryhandler = MyMemoryHandler(
        10000,
        logging.CRITICAL,
        target=qh,
        flushOnClose=True
    )

    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(memoryhandler)

    for i in range(4):
        logger = logging.getLogger()
        # You can even write out CRITICAL level records now without flushing occurring:
        logger.log(logging.CRITICAL, 'Message no. %d', i)

    with lock:
        memoryhandler.close()
        q.put(SENTINEL) # write sentinel


if __name__ == '__main__':
    lock = Lock()
    q = Queue()
    N_TASKS = 4
    with ProcessPoolExecutor(max_workers=min(cpu_count(), N_TASKS), initializer=init_pool, initargs=(lock, q)) as executor:
        for i in range(N_TASKS):
            executor.submit(worker_process)

        logger = logging.getLogger()
        seen_sentinels = 0
        while seen_sentinels < N_TASKS:
            record = q.get()
            if record == SENTINEL:
                seen_sentinels += 1
            else:
                logger.handle(record)

印刷:

Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3

LogHandler班级

下面的LogHandler类封装了上面代码中分散的大部分逻辑。我们不需要保留对LogHandler我们创建的实例的引用(实例化它就足够了),也不需要显式调用close它。

import logging, logging.handlers
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Lock, cpu_count, current_process
import sys

class LogHandler(logging.handlers.MemoryHandler):
    def __init__(self,
                 lock,
                 *,
                 level=logging.DEBUG,
                 stream=sys.stdout,
                 capacity=10000,
                 format=False
                 ):
        self._lock = lock # a "suitable" lock

        stream_handler = logging.StreamHandler(stream)
        if format:
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            stream_handler.setFormatter(formatter)

        super().__init__(capacity,
                       logging.CRITICAL,
                       target=stream_handler,
                       flushOnClose=True
                       )

        root = logging.getLogger()
        root.setLevel(level)
        root.addHandler(self)

    def shouldFlush(self, record):
        """
        Check only for the buffer full.
        """
        return (len(self.buffer) >= self.capacity)

    def flush(self):
        """ serialize """
        with self._lock:
            super().flush()


####################################################

def init_pool(the_lock):
    global lock
    lock = the_lock

def worker_process():
    import time

    # give each process in the pool a chance to run:
    time.sleep(.1)

    LogHandler(lock, format=True) # formatted output

    logger = logging.getLogger(str(current_process().pid))
    for i in range(4):
        time.sleep(.5)
        logger.log(logging.CRITICAL, 'message no. %d', i)


if __name__ == '__main__':
    lock = Lock()
    with ProcessPoolExecutor(max_workers=min(cpu_count(), 4), initializer=init_pool, initargs=(lock,)) as executor:
        for i in range(4):
            executor.submit(worker_process)

印刷:

2021-07-05 07:22:07,925 - 20148 - CRITICAL - message no. 0
2021-07-05 07:22:08,425 - 20148 - CRITICAL - message no. 1
2021-07-05 07:22:08,926 - 20148 - CRITICAL - message no. 2
2021-07-05 07:22:09,426 - 20148 - CRITICAL - message no. 3
2021-07-05 07:22:07,926 - 10864 - CRITICAL - message no. 0
2021-07-05 07:22:08,426 - 10864 - CRITICAL - message no. 1
2021-07-05 07:22:08,927 - 10864 - CRITICAL - message no. 2
2021-07-05 07:22:09,427 - 10864 - CRITICAL - message no. 3
2021-07-05 07:22:07,929 - 8528 - CRITICAL - message no. 0
2021-07-05 07:22:08,429 - 8528 - CRITICAL - message no. 1
2021-07-05 07:22:08,930 - 8528 - CRITICAL - message no. 2
2021-07-05 07:22:09,430 - 8528 - CRITICAL - message no. 3
2021-07-05 07:22:07,930 - 21200 - CRITICAL - message no. 0
2021-07-05 07:22:08,430 - 21200 - CRITICAL - message no. 1
2021-07-05 07:22:08,931 - 21200 - CRITICAL - message no. 2
2021-07-05 07:22:09,431 - 21200 - CRITICAL - message no. 3

推荐阅读