apache-kafka - enable.auto.commit 和 auto.commit.interval.ms 如何影响消费者的偏移提交
问题描述
这是我的消费者设置。
enable.auto.commit - true (default value)
auto.commit.interval.ms - 5000 ms (default value)
max.poll.interval.ms - 5 mins (default value)
max.poll.records - 500 (default value)
使用这些设置,假设我在轮询时获得 500 条记录,如果消费者在这 5000 毫秒内只能处理 100 条记录,我的问题是
- 它会只提交 100 条记录吗?
- 如果上述问题的答案是“是”,那么其他记录会怎样?
- 如果第一个问题的答案是否定的,它应该提交所有 500 条记录的偏移量。那么“max.poll.interval.ms”什么时候出现,这对偏移提交有何影响?
解决方案
我假设您正在询问〜现代的 Java 消费者。
它会只提交 100 条记录吗?
subscribe()
如果您使用消费者组管理(函数),除了向代理发送心跳之外,消费者上的一切都发生在调用者线程(您的线程)上,作为调用的一部分poll()
。这包括提交补偿。这意味着在您致电之前不会发送任何偏移量poll()
,因此在您的情况下,答案是否定的 - 只有在您完成这 500 条记录后才会提交偏移量。
如果上述问题的答案是“是”,那么其他记录会怎样?
答案是否定的,但在一些较旧的客户端中,后台线程负责自动偏移提交,更糟糕的情况是,如果您的应用程序崩溃,它将恢复到第 500 条记录的位置(因此您会跳过那些 400您尚未处理的记录)。但同样,现代消费者并非如此
如果第一个问题的答案是否定的,它应该提交所有 500 条记录的偏移量。那么“max.poll.interval.ms”什么时候出现,这对偏移提交有何影响?
subscibe()
仅当您使用消费者组管理(而不是assign()
)时,偏移提交和“活跃度”才相关。假设您使用 CGM,kafka 集群需要确定消费者是否“活着”,如果他们认为消费者已经死亡,则其工作(分区)被重新分配给另一个活着的消费者。现代 kafka 将“活力”定义为“取得进展”,取得进展意味着您“经常”调用民意调查。“经常足够”定义为max.poll.interval
- 因此,即使有一个心跳线程在更短的时间间隔内向 kafka 发送心跳(我认为默认值是 ~3 秒),如果你停止调用 poll 5 分钟,心跳线程也会停止。更准确地说 - 心跳线程将向 kafka 发送离开组请求,然后停止。如果您处于这种情况(因缺乏进展而被踢出组),您的消费者提交抵消的任何尝试都将失败 - 如果使用 CGM,kafka 仅接受来自实时成员的抵消提交。
max.poll.interval
这意味着在和之间存在固有的权衡max.poll.records
- 您从消费者那里获得的工作量越大,poll()
您完成它们并poll()
再次调用所需的时间越长,您被踢出小组的风险就越高。
推荐阅读
- django - 用于 Django REST 框架的 Nginx 上传模块
- node.js - 有没有办法根据 UUID 数组更新 Postgres 中的行集?
- amazon-web-services - 如何确定哪些 AMI 可用于选定的 AZ
- performance - 大内存使用会减慢不相关的代码
- django - 如何在 Django 中从购物车中删除产品?
- c - 当用户在 C 中输入“#”时,停止将字符串连接到动态数组
- jquery - jQuery jqgrid 不执行服务器调用
- android - 安装位置 2.3.5 包时出现 Flutter 编译器错误
- elasticsearch - 在弹性搜索的子聚合中过滤
- swift - 发布图像对象 Clarifai rest api swift