首页 > 解决方案 > enable.auto.commit 和 auto.commit.interval.ms 如何影响消费者的偏移提交

问题描述

这是我的消费者设置。

enable.auto.commit  - true (default value)
auto.commit.interval.ms - 5000 ms (default value)
max.poll.interval.ms - 5 mins (default value)
max.poll.records - 500 (default value)

使用这些设置,假设我在轮询时获得 500 条记录,如果消费者在这 5000 毫秒内只能处理 100 条记录,我的问题是

  1. 它会只提交 100 条记录吗?
  2. 如果上述问题的答案是“是”,那么其他记录会怎样?
  3. 如果第一个问题的答案是否定的,它应该提交所有 500 条记录的偏移量。那么“max.poll.interval.ms”什么时候出现,这对偏移提交有何影响?

标签: apache-kafka

解决方案


我假设您正在询问〜现代的 Java 消费者。

它会只提交 100 条记录吗?

subscribe()如果您使用消费者组管理(函数),除了向代理发送心跳之外,消费者上的一切都发生在调用者线程(您的线程)上,作为调用的一部分poll()。这包括提交补偿。这意味着在您致电之前不会发送任何偏移量poll(),因此在您的情况下,答案是否定的 - 只有在您完成这 500 条记录后才会提交偏移量。

如果上述问题的答案是“是”,那么其他记录会怎样?

答案是否定的,但在一些较旧的客户端中,后台线程负责自动偏移提交,更糟糕的情况是,如果您的应用程序崩溃,它将恢复到第 500 条记录的位置(因此您会跳过那些 400您尚未处理的记录)。但同样,现代消费者并非如此

如果第一个问题的答案是否定的,它应该提交所有 500 条记录的偏移量。那么“max.poll.interval.ms”什么时候出现,这对偏移提交有何影响?

subscibe()仅当您使用消费者组管理(而不是assign())时,偏移提交和“活跃度”才相关。假设您使用 CGM,kafka 集群需要确定消费者是否“活着”,如果他们认为消费者已经死亡,则其工作(分区)被重新分配给另一个活着的消费者。现代 kafka 将“活力”定义为“取得进展”,取得进展意味着您“经常”调用民意调查。“经常足够”定义为max.poll.interval- 因此,即使有一个心跳线程在更短的时间间隔内向 kafka 发送心跳(我认为默认值是 ~3 秒),如果你停止调用 poll 5 分钟,心跳线程也会停止。更准确地说 - 心跳线程将向 kafka 发送离开组请求,然后停止。如果您处于这种情况(因缺乏进展而被踢出组),您的消费者提交抵消的任何尝试都将失败 - 如果使用 CGM,kafka 仅接受来自实时成员的抵消提交。

max.poll.interval这意味着在和之间存在固有的权衡max.poll.records- 您从消费者那里获得的工作量越大,poll()您完成它们并poll()再次调用所需的时间越长,您被踢出小组的风险就越高。


推荐阅读