python - 如何以编程方式检查 Kafka Broker 是否在 Python 中启动并运行
问题描述
我正在尝试使用来自 Kafka 主题的消息。我正在使用confluent_kafka
消费者的包装器。在开始使用消息之前,我需要检查是否建立了连接。
我读到消费者很懒,所以我需要执行一些操作才能建立连接。但是我想在不做consume
orpoll
操作的情况下检查连接建立。
另外,我尝试提供一些错误的配置,以查看投票的响应是什么。我得到的回应是:
b'Broker: No more messages'
那么,如何判断是连接参数错误,连接断开,还是主题中实际上没有消息呢?
解决方案
恐怕没有直接的方法来测试 Kafka Brokers 是否启动并运行。另请注意,如果您的消费者已经消费了消息,这并不意味着这是一种不良行为,显然这并不表示 Kafka 代理已关闭。
一种可能的解决方法是执行某种快速操作并查看代理是否响应。一个例子是列出主题:
使用confluent-kafka-python
和AdminClient
# Example using confuent_kafka
from confluent_kafka.admin import AdminClient
kafka_broker = {'bootstrap.servers': 'localhost:9092'}
admin_client = AdminClient(kafka_broker)
topics = admin_client.list_topics().topics
if not topics:
raise RuntimeError()
# example using kafka-python
import kafka
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['localhost:9092'])
topics = consumer.topics()
if not topics:
raise RuntimeError()
推荐阅读
- c++ - 如何在 C++ 中使用 OpenMP 并行递归函数
- laravel - 如何在 laravel 6 中将注销方法更改为 GET?
- c - 如何在C中通过引用将矩阵传递给函数?
- java - PageNotFound - 不支持请求方法“GET”
- nodemcu - 如何在 proteus 模拟中将我的代码或我的 .hex 文件上传到 nodeMCU?
- c++ - 通过 libvlc 将音频转换为解压缩 wav
- javascript - 登录按钮单击ReactJS后如何更改页面
- c# - 无法从 .Net Core 3.0 中的程序集中加载类型
- arrays - TypeScript:定义未知大小的数组类型,其中第一个元素是 A 类型,其余元素是 B 类型
- javascript - React 什么时候重新渲染子组件?