apache-kafka - 使用 acks=all 和刷新的 Kakfa 生产者消息传递
问题描述
使用“acks=all”配置创建 Kafka Producer。
他们用上面的配置调用flush有什么意义吗?
它会在发送到代理之前等待调用刷新吗?
作为
acks=all 这意味着领导者将等待完整的同步副本集来确认记录。这保证了只要至少一个同步副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于 acks=-1 设置。
解决方案
根据文档
冲洗():
调用此方法使所有缓冲的记录立即可供发送(即使 linger_ms 大于 0)并阻塞与这些记录关联的请求的完成。flush() 的后置条件是任何先前发送的记录都将完成(例如 Future.is_done() == True)。当根据生产者的“acks”配置成功确认请求或导致错误时,请求被视为已完成。
当一个线程被阻塞等待刷新调用完成时,其他线程可以继续发送消息;但是,不能保证在刷新调用开始后发送的消息完成。
即使 ack=0,flush() 仍然会阻塞客户端应用程序,直到所有消息都发送完毕。唯一的事情是它不会等待一个确认,该块只是直到缓冲区被发送出去。
带有 ack=all 的 flush() 保证消息已发送并已在具有所需复制因子的集群上复制。
最后,回答您的问题:在发送到 broker 之前,它会等待调用 flush吗?
答:不一定。生产者按间隔或按批量大小不断发送消息( buffer.memory 控制生产者可用于缓冲的内存总量)。但是,flush() 确保发送所有消息总是好的。
有关更多信息,请参阅此链接。
推荐阅读
- javascript - 在 netlify 上上传 vue-cli 应用程序时,json 服务器不工作?
- authentication - Crystal Reports Viewer 要求在 64 位版本上登录数据库
- cocoa - NSVisualEffectView 的位置和大小不明确
- javascript - 数据显示在控制台中,但不在网站 Angular 中
- c# - 私有配置管理器字段在单元测试中失败
- ruby-on-rails - OpenSSL::SSL::SSLError(SSL_connect 返回=1 errno=0 状态=错误:证书验证失败(证书已过期))
- postgresql - CREATE UNIQUE INDEX... 没有约束 vs CREATE TABLE... DDL with PRIMARY KEY UNIQUE INDEX 和 CONSTRAINT?
- segmentation-fault - M1 上的 SceneKit SCNMatrix4Mult EXC_BAD_ACCESS
- c - 如何在特定位置翻转特定位并携带该值以便稍后在程序中再次执行
- android - Gradle build daemon 意外消失