python - 如何对使用 confluent_kafka Consumer 的代码进行单元测试?
问题描述
我正在尝试使用 pytest 对以下代码进行单元测试:
import json
from typing import Any, Dict
from confluent_kafka import Consumer
def get_message(config: Dict[str, Any]):
consumer = Consumer(
{
"group.id": config["KAFKA_GROUP_ID"],
"bootstrap.servers": config["KAFKA_BROKERS"],
"default.topic.config": {"auto.offset.reset": "smallest"},
}
)
consumer.subscribe([config["KAFKA_TOPIC"]])
while True:
collect = consumer.poll()
if collect is None:
continue
try:
message = json.loads(collect.value().decode("utf-8"))
except json.JSONDecodeError:
continue
return message
但我不能模拟订阅功能。我试过了:
mock_subscribe = MagicMock(return_value='test')
monkeypatch.setattr('confluent_kafka.cimpl.Consumer.subscribe', mock_subscribe)
结果我得到以下错误:
TypeError: can't set attributes of built-in/extension type
如何正确模拟此功能?
解决方案
推荐阅读
- excel - 如何找到最早日期的配对值?
- azure-devops-pipelines - Azure DevOps Pipelines 中的用户审计?
- bluetooth-lowenergy - 如何禁用蓝牙连接,只做广告
- docusignapi - DocuSign API:如何使用应用程序身份验证创建信封?
- r - R:在交叉表中显示小计
- apache-spark - spark:在 hdfs 中将数据帧写入文件与将 rdd 写入文件之间的区别
- java - 程序解释方法之外的另一部分代码
- c# - 什么是页面导航中的模态上下文?
- java - Java如何将特定时间转换为不同的时区
- r - R data.table 范围,使用变量可靠地引用未知列名