python - 为什么 multiprocessing.Process() 和 concurrent.futures.ProcessPoolExecutor() 使用 logging.handlers.QueueHandler() 给出不同的日志输出?
问题描述
我正在尝试使我的日志记录与concurrent.futures.ProcessPoolExecutor
并行处理兼容。
我正在关注这个例子。
这是示例中的代码(在worker_process()
和中进行了修改main()
):
# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing
import concurrent.futures
# Next two import lines for this demo only
from random import choice, random
import time
import os
#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 3000000, 10)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
configurer()
old_record = ''
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
if not record == old_record:
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
old_record = record
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
LOGGERS = ['a.b.c', 'd.e.f']
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer, k):
configurer(queue)
name = multiprocessing.current_process().name
print('Worker started: %s' % name)
time.sleep(random())
logger = logging.getLogger(choice(LOGGERS))
level = choice(LEVELS)
message = f'Message #{k}'
logger.log(level, message)
print('Worker finished: %s' % name)
# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main(mode):
n = 30
queue = multiprocessing.Manager().Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
if mode == 'multiprocessing':
workers = []
for i in range(n):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer, i))
workers.append(worker)
worker.start()
for w in workers:
w.join()
elif mode == 'concurrent.futures':
with concurrent.futures.ProcessPoolExecutor(max_workers=n) as executor:
for i in range(n):
executor.submit(worker_process, queue, worker_configurer, i)
queue.put_nowait(None)
listener.join()
if __name__ == '__main__':
if os.path.exists('mptest.log'): os.unlink('mptest.log')
main(mode='multiprocessing')
如果我设置mode='multiprocessing'
,生成的日志文件有 30 行文本,正如我所期望的那样。
但是,如果我设置mode='concurrent.features'
,则生成的日志文件有 30 到 60 行文本,并且行数会有所不同。
您对为什么会发生这种情况以及我如何解决这个问题有任何想法吗?
解决方案
推荐阅读
- python - 从 twitter 流中排除回复 - tweepy
- javascript - 通过 chrome 扩展访问控制台日志命令
- memory - Julia 中的内存分配和求解线性系统
- python - 终止了 Fashion MNST 下载,现在尝试再次下载时出错
- javascript - 日期格式与 Node.JS、Sequelize 和 Moment.JS
- python - as_view() 接受 1 个位置参数,但给出了 2 个
- java - 尝试启动 react-native android 应用程序时出错,未设置 JAVA_HOME,在您的路径中找不到 java 命令
- sql - SQL CHECKSUM_AGG(BINARY_CHECKSUM(*)) 为具有相似内容的 2 个不同表提供相同的结果
- angular - Angular - 如果启用了 http 拦截器并修改响应,则 http post ReportProgress 返回未定义的事件
- python - Flask-Login 使用 POST 请求登录服务