首页 > 解决方案 > Spring Kafka 不尊重 max.poll.records 的奇怪行为

问题描述

好吧,我正在尝试以下场景:

  1. 在 application.properties 中将 max.poll.records 设置为 50。
  2. 在 application.properties 中将 enable-auto-commit=false 和 ack-mode 设置为手动。
  3. 在我的方法中添加了@KafkaListener,但不要提交任何消息,只需阅读、记录但不要发出 ACK。

实际上,在我的 Kafka 主题中,我有 500 条消息要使用,所以我期待以下行为:

  1. Spring Kafka poll() 50 条消息(偏移量 0 到 50)。
  2. 正如我所说,我没有提交任何内容,只是记录了 50 条消息。
  3. 在下一次 Spring Kafka poll() 调用中,获取与步骤 1 相同的 50 条消息(偏移量 0 到 50)。据我了解,Spring Kafka 应该继续在此循环中(步骤 1-3)读取始终相同的消息。

但是会发生以下情况:

  1. Spring Kafka poll() 50 条消息(偏移量 0 到 50)。
  2. 正如我所说,我没有提交任何内容,只是记录了 50 条消息。
  3. 在下一个 Spring Kafka poll() 调用中,获取 NEXT 50 条消息,与步骤 1 不同(偏移量 50 到 100)。

Spring Kafka 以 50 条消息为单位读取 500 条消息,但不提交任何内容。如果我关闭应用程序并重新启动,则会再次收到 500 条消息。

所以,我的疑问:

  1. 如果我将 max.poll.recors 配置为 50,如果我没有提交任何内容,spring Kafka 如何获取接下来的 50 条记录?我知道 poll() 方法应该返回相同的记录。
  2. Spring Kafka 有缓存吗?如果是的话,如果我在没有提交的情况下在缓存中获得 100 万条记录,这可能是个问题。

标签: springapache-kafkaspring-kafka

解决方案


你的第一个问题:

如果我将 max.poll.recors 配置为 50,如果我没有提交任何内容,spring Kafka 如何获取接下来的 50 条记录?我知道 poll() 方法应该返回相同的记录。

首先,为了确保你没有提交任何东西,你必须确保你理解以下 3 个参数,我相信你已经理解了。

  • ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,将其设置为 false(这也是推荐的默认值)。如果它设置为 false,请注意这auto.commit.interval.ms变得无关紧要。查看文档:

因为侦听器容器有自己的提交偏移量的机制,所以它更喜欢 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 为 false。从 2.3 版开始,它无条件地将其设置为 false,除非在消费者工厂中特别设置或容器的消费者属性覆盖。

  • factory.getContainerProperties().setAckMode(AckMode.MANUAL); 您有责任承认。(在使用事务时忽略)并且ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG不能是true.

  • factory.getContainerProperties().setSyncCommits(true/false);设置是否调用consumer.commitSync()commitAsync() 容器何时负责提交。默认为真。这负责与 Kafka 同步,没有别的,如果设置为 true,则该调用将阻塞,直到 Kafka 响应。

其次,消费者 poll() 不会返回相同的记录。对于当前运行的消费者,它使用一些内部索引来跟踪它在内存中的偏移量,我们不必关心提交偏移量。另请参阅@GaryRussell在此处的解释。

简而言之,他解释说:

一旦轮询返回了记录(并且偏移量未提交),它们将不会再次返回,除非您重新启动消费者或对消费者执行 seek() 操作以将偏移量重置为未处理的记录。


你的第二个问题:

Spring Kafka 有缓存吗?如果是的话,如果我在没有提交的情况下在缓存中获得 100 万条记录,这可能是个问题。

没有“缓存”,都是关于偏移量和提交的,解释如上。



现在要实现您想要做的事情,您可以考虑在获取前 50 条记录后做 2 件事,即下一个 poll():

  • 要么,以编程方式重新启动容器
  • 或致电consumer.seek(partition, offset);


奖励:
无论您选择什么配置,您都可以通过查看此输出的列来查看结果:LAG

kafka-consumer-groups.bat --bootstrap-server localhost:9091 --describe --group your_group_name

推荐阅读