首页 > 解决方案 > Kafka生产者flush和poll的区别

问题描述

我们有一个 Kafka 消费者,它将读取消息并执行此操作,然后使用以下脚本再次发布到 Kafka 主题

生产者配置:

{
  "bootstrap.servers": "localhost:9092"
}

我还没有配置任何其他配置,例如queue.buffering.max.messages queue.buffering.max.ms batch.num.messages

我假设这些都将是配置中的默认值

queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000

我的理解:当内部队列达到 queue.buffering.max.ms 或 batch.num.messages 消息时,消息将在单独的线程中发布到 Kafka。在我的配置中 queue.buffering.max.ms 是 0,所以当我调用生产()时,每条消息都会被发布。如果我错了,请纠正我。

我的制作人片段:

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.flush()

这篇文章中我了解到,在每条消息之后使用刷新,生产者将成为同步生产者。如果我使用上面的脚本,发布到 Kafka 大约需要 45 毫秒

如果我将上面的代码段更改为

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.poll(0)

有没有什么性能会有所提升?你能澄清我的理解吗?

谢谢

标签: pythonapache-kafkakafka-producer-apiconfluent-platform

解决方案


flush()和之间的区别在poll()客户的文档中进行了说明。

对于flush(),它指出:

等待生产者队列中的所有消息被传递。这是一种调用 poll() 直到 len() 为零或可选超时结束的便捷方法。

对于poll()

轮询生产者的事件并调用相应的回调(如果已注册)。

poll()在 a 之后调用send()不会使生产者同步,因为刚刚发送的消息不太可能已经到达代理并且已经将传递报告发送回客户端。

而是flush()会阻塞,直到先前发送的消息已被传递(或错误),从而有效地使生产者同步。


推荐阅读