python - 如何缓冲每个工作进程的日志并在进程完成时刷新它
问题描述
这是我到目前为止所拥有的:
import logging, logging.handlers
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
def worker_process(q):
qh = logging.handlers.QueueHandler(q)
memoryhandler = logging.handlers.MemoryHandler(
10000,
logging.CRITICAL,
target=qh,
flushOnClose=True
)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(memoryhandler)
for i in range(4):
logger = logging.getLogger()
logger.log(logging.ERROR, 'Message no. %d', i)
memoryhandler.close()
if __name__ == '__main__':
q = Manager().Queue()
with ProcessPoolExecutor(max_workers=3) as executor:
for i in range(4):
executor.submit(worker_process, q)
logger = logging.getLogger()
while True:
record = q.get()
logger.handle(record)
根据 MemoryHandler 文档,我希望我的 memoryhandler 缓冲日志记录并在memoryhandler.close()
调用时将它们一次全部刷新到队列中。
但是,这不是这里发生的事情:
Message no. 0
Message no. 0
Message no. 1
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 1
Message no. 2
Message no. 2
Message no. 3
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
为什么会这样?
有没有办法使用日志库获得以下输出?
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
解决方案
问题是您忘记了这是一个多处理应用程序,并且您的进程正在并行写入传递的队列,因此除了最后 4 条记录之外,日志记录将按照您看到的任意顺序(因为您只有 3池中的进程和第 4 次submit
调用实际上不会运行,直到前 3次submit
调用终止,因此将全部自行运行)。
还有一个额外的问题是你的主进程永远不会因为你的while True:
循环而终止。这可以通过让每个任务在实际日志记录之后将额外的哨兵记录写入队列(例如)来解决。None
然后主进程可以循环,直到它看到这些标记记录中的 4 个,当然,它会忽略这些记录。
解决方案是使用一个在被调用MemoryHandler
之前不会刷新记录的close
正则(并确保无论正在写入的记录的日志记录级别是什么都不会发生这种情况,我们可以子类化这个类并覆盖该shouldFlush
方法)但是然后确保在 a 的控制下刷新所有记录,Lock
以便一次只刷新一个进程:
import logging, logging.handlers
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Lock, cpu_count
import sys
class MyMemoryHandler(logging.handlers.MemoryHandler):
def shouldFlush(self, record):
"""
Check only for the buffer full.
"""
return (len(self.buffer) >= self.capacity)
def init_pool(the_lock):
global lock
lock = the_lock
def worker_process():
memoryhandler = MyMemoryHandler(
10000,
logging.CRITICAL,
target=logging.StreamHandler(sys.stderr),
flushOnClose=True
)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(memoryhandler)
for i in range(4):
logger = logging.getLogger()
# You can even write out CRITICAL level records now without flushing occurring:
logger.log(logging.CRITICAL, 'Message no. %d', i)
with lock:
memoryhandler.close()
if __name__ == '__main__':
lock = Lock()
# Note we are now using a pool size of 4
with ProcessPoolExecutor(max_workers=4, initializer=init_pool, initargs=(lock,)) as executor:
for i in range(4):
executor.submit(worker_process)
印刷:
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
如果您想像以前一样使用队列而不是无限循环,那么您可以使用修改如下的代码。请注意,我已经用一个实例替换了一个托管队列,它的性能要好得多。multiprocessing.Queue
但是,为了确保没有死锁,主进程不能等到写入进程完成后才从队列中读取所有消息,因为如果没有读取器,写入进程可以阻止将记录写入队列。这就是为什么我移动了从with ProcessPoolExecutor(...) as executor:
块内的队列中读取记录的代码。我还通过初始化池的全局q
变量中的每个进程使队列实例可用于工作函数(它不能作为参数传递,否则在处理处理池时一切都会挂起)。
import logging, logging.handlers
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue, Lock, cpu_count
import sys
class MyMemoryHandler(logging.handlers.MemoryHandler):
def shouldFlush(self, record):
"""
Check only for the buffer full.
"""
return (len(self.buffer) >= self.capacity)
def init_pool(the_lock, the_queue):
global lock, q
lock = the_lock
q = the_queue
SENTINEL = None
def worker_process():
qh = logging.handlers.QueueHandler(q)
memoryhandler = MyMemoryHandler(
10000,
logging.CRITICAL,
target=qh,
flushOnClose=True
)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(memoryhandler)
for i in range(4):
logger = logging.getLogger()
# You can even write out CRITICAL level records now without flushing occurring:
logger.log(logging.CRITICAL, 'Message no. %d', i)
with lock:
memoryhandler.close()
q.put(SENTINEL) # write sentinel
if __name__ == '__main__':
lock = Lock()
q = Queue()
N_TASKS = 4
with ProcessPoolExecutor(max_workers=min(cpu_count(), N_TASKS), initializer=init_pool, initargs=(lock, q)) as executor:
for i in range(N_TASKS):
executor.submit(worker_process)
logger = logging.getLogger()
seen_sentinels = 0
while seen_sentinels < N_TASKS:
record = q.get()
if record == SENTINEL:
seen_sentinels += 1
else:
logger.handle(record)
印刷:
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
LogHandler
班级
下面的LogHandler
类封装了上面代码中分散的大部分逻辑。我们不需要保留对LogHandler
我们创建的实例的引用(实例化它就足够了),也不需要显式调用close
它。
import logging, logging.handlers
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Lock, cpu_count, current_process
import sys
class LogHandler(logging.handlers.MemoryHandler):
def __init__(self,
lock,
*,
level=logging.DEBUG,
stream=sys.stdout,
capacity=10000,
format=False
):
self._lock = lock # a "suitable" lock
stream_handler = logging.StreamHandler(stream)
if format:
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
super().__init__(capacity,
logging.CRITICAL,
target=stream_handler,
flushOnClose=True
)
root = logging.getLogger()
root.setLevel(level)
root.addHandler(self)
def shouldFlush(self, record):
"""
Check only for the buffer full.
"""
return (len(self.buffer) >= self.capacity)
def flush(self):
""" serialize """
with self._lock:
super().flush()
####################################################
def init_pool(the_lock):
global lock
lock = the_lock
def worker_process():
import time
# give each process in the pool a chance to run:
time.sleep(.1)
LogHandler(lock, format=True) # formatted output
logger = logging.getLogger(str(current_process().pid))
for i in range(4):
time.sleep(.5)
logger.log(logging.CRITICAL, 'message no. %d', i)
if __name__ == '__main__':
lock = Lock()
with ProcessPoolExecutor(max_workers=min(cpu_count(), 4), initializer=init_pool, initargs=(lock,)) as executor:
for i in range(4):
executor.submit(worker_process)
印刷:
2021-07-05 07:22:07,925 - 20148 - CRITICAL - message no. 0
2021-07-05 07:22:08,425 - 20148 - CRITICAL - message no. 1
2021-07-05 07:22:08,926 - 20148 - CRITICAL - message no. 2
2021-07-05 07:22:09,426 - 20148 - CRITICAL - message no. 3
2021-07-05 07:22:07,926 - 10864 - CRITICAL - message no. 0
2021-07-05 07:22:08,426 - 10864 - CRITICAL - message no. 1
2021-07-05 07:22:08,927 - 10864 - CRITICAL - message no. 2
2021-07-05 07:22:09,427 - 10864 - CRITICAL - message no. 3
2021-07-05 07:22:07,929 - 8528 - CRITICAL - message no. 0
2021-07-05 07:22:08,429 - 8528 - CRITICAL - message no. 1
2021-07-05 07:22:08,930 - 8528 - CRITICAL - message no. 2
2021-07-05 07:22:09,430 - 8528 - CRITICAL - message no. 3
2021-07-05 07:22:07,930 - 21200 - CRITICAL - message no. 0
2021-07-05 07:22:08,430 - 21200 - CRITICAL - message no. 1
2021-07-05 07:22:08,931 - 21200 - CRITICAL - message no. 2
2021-07-05 07:22:09,431 - 21200 - CRITICAL - message no. 3
推荐阅读
- java - 在android中将PDF文件转换为字节数组
- c# - 如何将 Filestream 与嵌入式资源文件一起使用?
- javascript - 从 TableSelectDialog 中删除/隐藏搜索栏
- android - Co Routines 从后台获取位置
- asynchronous - 将 Rust 编译为 wasm(Web 程序集)时,我如何才能睡 10 毫秒?
- python-3.x - 识别具有所有(或几乎所有)NaN 值的列,其中目标变量是某个值
- angular - ./node_modules/ng2-charts/fesm5/ng2-charts.js 230:54-72 “在‘@angular/core’中找不到导出‘ɵɵdefineInjectable’
- pyspark - 如何从 Azure 数据工厂运行 python egg(存在于 azure databricks 中)?
- javascript - JavaScript setInterval() 在 Windows CE 设备上不起作用
- flutter - 如何在 Flutter 中使用另一个提供者内部的提供者