首页 > 解决方案 > 如何以编程方式检查 Kafka Broker 是否在 Python 中启动并运行

问题描述

我正在尝试使用来自 Kafka 主题的消息。我正在使用confluent_kafka消费者的包装器。在开始使用消息之前,我需要检查是否建立了连接。

我读到消费者很懒,所以我需要执行一些操作才能建立连接。但是我想在不做consumeorpoll操作的情况下检查连接建立。

另外,我尝试提供一些错误的配置,以查看投票的响应是什么。我得到的回应是:

b'Broker: No more messages'

那么,如何判断是连接参数错误,连接断开,还是主题中实际上没有消息呢?

标签: pythonapache-kafkakafka-consumer-apikafka-python

解决方案


恐怕没有直接的方法来测试 Kafka Brokers 是否启动并运行。另请注意,如果您的消费者已经消费了消息,这并不意味着这是一种不良行为,显然这并不表示 Kafka 代理已关闭。


一种可能的解决方法是执行某种快速操作并查看代理是否响应。一个例子是列出主题:

使用confluent-kafka-pythonAdminClient

# Example using confuent_kafka
from confluent_kafka.admin import AdminClient

kafka_broker = {'bootstrap.servers': 'localhost:9092'}
admin_client = AdminClient(kafka_broker)
topics = admin_client.list_topics().topics

if not topics: 
    raise RuntimeError()

使用kafka-pythonKafkaConsumer

# example using kafka-python
import kafka


consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['localhost:9092'])
topics = consumer.topics()

if not topics: 
    raise RuntimeError()

推荐阅读