python - Kafka 消息密钥为无
问题描述
这是我的 Kafka 生产者和消费者的实现:
async def produce(topic_name):
"""Produces data into the Kafka Topic"""
p = Producer({"bootstrap.servers": BROKER_URL})
curr_iteration = 0
while True:
p.produce(topic_name, f"iteration {curr_iteration}".encode("utf-8"))
curr_iteration += 1
await asyncio.sleep(0.5)
async def consume(topic_name):
"""Consumes data from the Kafka Topic"""
c = Consumer({"bootstrap.servers": BROKER_URL, "group.id": "0"})
c.subscribe([topic_name])
while True:
message = c.poll(1.0)
if message is None:
print("no message received by consumer")
elif message.error() is not None:
print(f"error from consumer {message.error()}")
else:
print(f"consumed message {message.key()}: {message.value()}")
await asyncio.sleep(2.5)
message.key()
得到None
控制台输出如下:
consumed message None: b'iteration 1'
consumed message None: b'iteration 8'
consumed message None: b'iteration 12'
consumed message None: b'iteration 15'
如何更新代码以获取消息密钥?
解决方案
更新您的生产功能,如下所示,
p.produce(topic, key="key", value="value")
要开始向 Kafka 发送消息,请调用该
produce
方法,传入消息值(可能是None
)和optionally a key
,分区和回调。生产调用将立即完成并且不返回值。如果由于 librdkafka 的本地生产队列已满而导致消息无法入队,则会引发 KafkaException。
推荐阅读
- design-patterns - 在向“至少一次”或“恰好一次”消费者发送消息时,我应该在消息生产者中考虑什么?
- c# - 从子页面永久更改母版页内容
- visual-studio - protobuf 的 vcpkg 构建未定义 FatalException
- python - 如何正确设置 DummyCache 以在 Django 中进行测试?
- python - 为什么安装gensim要花很多时间?没有输出需要几个小时
- c - 我的管道正确吗?
- flutter - 为什么恒等函数会破坏 Dart 中函数组合的类型推断?
- sharepoint - 查明是否继承了 OneDrive for Business 权限
- drupal - Drupal Mail 以编程方式 - 需要教程
- android - 在 Android 中以编程方式关闭谷歌照片备份