首页 > 解决方案 > 当日志在不同的进程中发出时,caplog 中的空消息

问题描述

我正在使用 log_cli=true 运行测试。剧本:

import logging
import sys
from multiprocessing import Process

logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)

logger = logging.getLogger("leapp.actors.quagga_report")


class ActorContext:
    def __init__(self):
        self.log = logger

    def run(self):
        self.log.debug("Some msg")


current_actor_context = ActorContext()


def test_caplog_fails(caplog):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        p = Process(target=current_actor_context.run)
        p.start()
        p.join()
    assert "Some msg" in caplog.text


def test_caplog_passes(caplog):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        current_actor_context.run()
    assert "Some msg" in caplog.text

pytest log_cli 在两个测试中都显示了日志消息,但是,caplog 只看到第二个测试的消息。

第一次测试失败并出现以下回溯:

-------------------------------- live log call ---------------------------------
| 13:39:20 | 40212 | leapp.actors.quagga_report | DEBUG | test_logger_caplog_fails.py | Some msg
FAILED
tests/test_logger_caplog_fails.py:20 (test_caplog_fails)
Traceback (most recent call last):
  File "/home/azhukov/Dropbox/code/lighting_talks/asyncio_subprocess_shells/tests/test_logger_caplog_fails.py", line 26, in test_caplog_fails
    assert "Some msg" in caplog.text
AssertionError: assert 'Some msg' in ''
 +  where '' = <_pytest.logging.LogCaptureFixture object at 0x7fb8a87f2370>.text

我正在寻找一个类似的问题Pytest capture not working - caplog 和 capsys 是空的,但在我的情况下,属性propagate=True

标签: pythonpytestfixtures

解决方案


受到@hoefling 的启发,并且仍然愿意使用 caplog,有一个解决方案。这个想法是创建一个fixture,它从QueueHandler处理程序中获取队列并重新发送主进程中的日志,这可以被caplog捕获

import logging
import sys
from contextlib import contextmanager
from logging import handlers
from multiprocessing import Process, Queue

import pytest

logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)


logger = logging.getLogger(__name__)


class ActorContext:
    def __init__(self):
        self.log = logger

    def run(self):
        self.log.debug("Some msg")


current_actor_context = ActorContext()


@pytest.fixture()
def caplog_workaround():
    @contextmanager
    def ctx():
        logger_queue = Queue()
        logger = logging.getLogger()
        logger.addHandler(handlers.QueueHandler(logger_queue))
        yield
        while not logger_queue.empty():
            log_record: logging.LogRecord = logger_queue.get()
            logger._log(
                level=log_record.levelno,
                msg=log_record.message,
                args=log_record.args,
                exc_info=log_record.exc_info,
            )

    return ctx


def test_caplog_already_not_fails(caplog, caplog_workaround):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        with caplog_workaround():
            p = Process(target=current_actor_context.run)
            p.start()
            p.join()
    assert "Some msg" in caplog.text


def test_caplog_passes(caplog, capsys):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        current_actor_context.run()
    assert "Some msg" in caplog.text

推荐阅读