首页 > 解决方案 > 如何对使用 confluent_kafka Consumer 的代码进行单元测试?

问题描述

我正在尝试使用 pytest 对以下代码进行单元测试:

import json
from typing import Any, Dict

from confluent_kafka import Consumer


def get_message(config: Dict[str, Any]):
    consumer = Consumer(
        {
            "group.id": config["KAFKA_GROUP_ID"],
            "bootstrap.servers": config["KAFKA_BROKERS"],
            "default.topic.config": {"auto.offset.reset": "smallest"},
        }
    )
    consumer.subscribe([config["KAFKA_TOPIC"]])
    while True:
        collect = consumer.poll()
        if collect is None:
            continue

        try:
            message = json.loads(collect.value().decode("utf-8"))
        except json.JSONDecodeError:
            continue

        return message

但我不能模拟订阅功能。我试过了:

mock_subscribe = MagicMock(return_value='test')
monkeypatch.setattr('confluent_kafka.cimpl.Consumer.subscribe', mock_subscribe)

结果我得到以下错误:

TypeError: can't set attributes of built-in/extension type

如何正确模拟此功能?

标签: pythonpython-3.xpytestpython-unittestconfluent-platform

解决方案


推荐阅读