python - 基于 confluent_kafka 的生产者:尽管可以访问 Kafka,但消息似乎永远不会被传递
问题描述
我正在尝试基于confluent_kafka创建一个简单的 Kafka 生产者。我的代码如下:
#!/usr/bin/env python
from confluent_kafka import Producer
import json
def delivery_report(err, msg):
"""Called once for each message produced to indicate delivery result.
Triggered by poll() or flush().
see https://github.com/confluentinc/confluent-kafka-python/blob/master/README.md"""
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(
msg.topic(), msg.partition()))
class MySource:
"""Kafka producer"""
def __init__(self, kafka_hosts, topic):
"""
:kafka_host list(str): hostnames or 'host:port' of Kafka
:topic str: topic to produce messages to
"""
self.topic = topic
# see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config = {
'metadata.broker.list': ','.join(kafka_hosts),
'group.id': 'mygroup',
}
self.producer = Producer(config)
@staticmethod
def main():
topic = 'my-topic'
message = json.dumps({
'measurement': [1, 2, 3]})
mys = MySource(['kafka'], topic)
mys.producer.produce(
topic, message, on_delivery=delivery_report)
mys.producer.flush()
if __name__ == "__main__":
MySource.main()
我第一次使用主题(此处:“my-topic”)时,Kafka 确实会响应“自动创建具有 1 个分区和复制因子 1 的主题 my-topic 成功(kafka.server.KafkaApis)”。on_delivery=delivery_report
然而,回调函数flush()
(如果我使用现有主题,Kafka 日志不会显示任何内容。
解决方案
推荐阅读
- javascript - Vue.js / Nginx / Node.js - 413 请求实体太大
- matplotlib - 根据下拉列表中的值返回图表
- c - 程序在 pthread_join 上终止
- kotlin - 变量重新分配没有空安全性?
- python - 在 Pandas 数据框中有条件地连接字符串
- oracle - oracle xml解析多行
- logging - 在java中记录用户
- cumulocity - Cumulocity 单点登录 - 在 SSO 用户的上下文中使用微服务进行身份验证
- java - 如何在 JSON 响应中的数组对象本身的数组之前添加名称?
- php - Woocommerce:购物车页面上每个产品的自定义字段