首页 > 解决方案 > 使用 acks=all 和刷新的 Kakfa 生产者消息传递

问题描述

使用“acks=all”配置创建 Kafka Producer。

他们用上面的配置调用flush有什么意义吗?

它会在发送到代理之前等待调用刷新吗?

作为

acks=all 这意味着领导者将等待完整的同步副本集来确认记录。这保证了只要至少一个同步副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于 acks=-1 设置。

标签: apache-kafkakafka-producer-api

解决方案


根据文档

冲洗():

调用此方法使所有缓冲的记录立即可供发送(即使 linger_ms 大于 0)并阻塞与这些记录关联的请求的完成。flush() 的后置条件是任何先前发送的记录都将完成(例如 Future.is_done() == True)。当根据生产者的“acks”配置成功确认请求或导致错误时,请求被视为已完成。

当一个线程被阻塞等待刷新调用完成时,其他线程可以继续发送消息;但是,不能保证在刷新调用开始后发送的消息完成。

即使 ack=0,flush() 仍然会阻塞客户端应用程序,直到所有消息都发送完毕。唯一的事情是它不会等待一个确认,该块只是直到缓冲区被发送出去。

带有 ack=all 的 flush() 保证消息已发送并已在具有所需复制因子的集群上复制。

最后,回答您的问题:在发送到 broker 之前,它会等待调用 flush吗?

答:不一定。生产者按间隔或按批量大小不断发送消息( buffer.memory 控制生产者可用于缓冲的内存总量)。但是,flush() 确保发送所有消息总是好的。

有关更多信息,请参阅此链接


推荐阅读