首页 > 解决方案 > Kafka 消息密钥为无

问题描述

这是我的 Kafka 生产者和消费者的实现:

async def produce(topic_name):
    """Produces data into the Kafka Topic"""
    p = Producer({"bootstrap.servers": BROKER_URL})

    curr_iteration = 0
    while True:
        p.produce(topic_name, f"iteration {curr_iteration}".encode("utf-8"))
        curr_iteration += 1
        await asyncio.sleep(0.5)


async def consume(topic_name):
    """Consumes data from the Kafka Topic"""
    c = Consumer({"bootstrap.servers": BROKER_URL, "group.id": "0"})
    c.subscribe([topic_name])
    while True:
        message = c.poll(1.0)
        if message is None:
            print("no message received by consumer")
        elif message.error() is not None:
            print(f"error from consumer {message.error()}")
        else:
            print(f"consumed message {message.key()}: {message.value()}")
        await asyncio.sleep(2.5)

message.key()得到None控制台输出如下:

consumed message None: b'iteration 1'
consumed message None: b'iteration 8'
consumed message None: b'iteration 12'
consumed message None: b'iteration 15'

如何更新代码以获取消息密钥?

标签: pythonapache-kafkapython-asyncio

解决方案


更新您的生产功能,如下所示,

p.produce(topic, key="key", value="value")

要开始向 Kafka 发送消息,请调用该produce方法,传入消息值(可能是None)和optionally a key,分区和回调。生产调用将立即完成并且不返回值。如果由于 librdkafka 的本地生产队列已满而导致消息无法入队,则会引发 KafkaException。

来源:-clients-confluent-kafka-python


推荐阅读