首页 > 解决方案 > 基于 confluent_kafka 的生产者:尽管可以访问 Kafka,但消息似乎永远不会被传递

问题描述

我正在尝试基于confluent_kafka创建一个简单的 Kafka 生产者。我的代码如下:

 #!/usr/bin/env python
 from confluent_kafka import Producer
 import json


 def delivery_report(err, msg):
     """Called once for each message produced to indicate delivery result.
     Triggered by poll() or flush().
     see https://github.com/confluentinc/confluent-kafka-python/blob/master/README.md"""
     if err is not None:
         print('Message delivery failed: {}'.format(err))
     else:
         print('Message delivered to {} [{}]'.format(
             msg.topic(), msg.partition()))


 class MySource:
     """Kafka producer"""
     def __init__(self, kafka_hosts, topic):
         """
         :kafka_host list(str): hostnames or 'host:port' of Kafka
         :topic str: topic to produce messages to
         """
         self.topic = topic
         # see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
         config = {
             'metadata.broker.list': ','.join(kafka_hosts),
             'group.id': 'mygroup',
             }
         self.producer = Producer(config)

     @staticmethod
     def main():
         topic = 'my-topic'
         message = json.dumps({
             'measurement': [1, 2, 3]})
         mys = MySource(['kafka'], topic)
         mys.producer.produce(
                 topic, message, on_delivery=delivery_report)
         mys.producer.flush()


 if __name__ == "__main__":
     MySource.main()

我第一次使用主题(此处:“my-topic”)时,Kafka 确实会响应“自动创建具有 1 个分区和复制因子 1 的主题 my-topic 成功(kafka.server.KafkaApis)”。on_delivery=delivery_report然而,回调函数flush()(如果我使用现有主题,Kafka 日志不会显示任何内容。

标签: pythonapache-kafka

解决方案


推荐阅读