spring - Spring Kafka 不尊重 max.poll.records 的奇怪行为
问题描述
好吧,我正在尝试以下场景:
- 在 application.properties 中将 max.poll.records 设置为 50。
- 在 application.properties 中将 enable-auto-commit=false 和 ack-mode 设置为手动。
- 在我的方法中添加了@KafkaListener,但不要提交任何消息,只需阅读、记录但不要发出 ACK。
实际上,在我的 Kafka 主题中,我有 500 条消息要使用,所以我期待以下行为:
- Spring Kafka poll() 50 条消息(偏移量 0 到 50)。
- 正如我所说,我没有提交任何内容,只是记录了 50 条消息。
- 在下一次 Spring Kafka poll() 调用中,获取与步骤 1 相同的 50 条消息(偏移量 0 到 50)。据我了解,Spring Kafka 应该继续在此循环中(步骤 1-3)读取始终相同的消息。
但是会发生以下情况:
- Spring Kafka poll() 50 条消息(偏移量 0 到 50)。
- 正如我所说,我没有提交任何内容,只是记录了 50 条消息。
- 在下一个 Spring Kafka poll() 调用中,获取 NEXT 50 条消息,与步骤 1 不同(偏移量 50 到 100)。
Spring Kafka 以 50 条消息为单位读取 500 条消息,但不提交任何内容。如果我关闭应用程序并重新启动,则会再次收到 500 条消息。
所以,我的疑问:
- 如果我将 max.poll.recors 配置为 50,如果我没有提交任何内容,spring Kafka 如何获取接下来的 50 条记录?我知道 poll() 方法应该返回相同的记录。
- Spring Kafka 有缓存吗?如果是的话,如果我在没有提交的情况下在缓存中获得 100 万条记录,这可能是个问题。
解决方案
你的第一个问题:
如果我将 max.poll.recors 配置为 50,如果我没有提交任何内容,spring Kafka 如何获取接下来的 50 条记录?我知道 poll() 方法应该返回相同的记录。
首先,为了确保你没有提交任何东西,你必须确保你理解以下 3 个参数,我相信你已经理解了。
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
,将其设置为 false(这也是推荐的默认值)。如果它设置为 false,请注意这auto.commit.interval.ms
变得无关紧要。查看此文档:
因为侦听器容器有自己的提交偏移量的机制,所以它更喜欢 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 为 false。从 2.3 版开始,它无条件地将其设置为 false,除非在消费者工厂中特别设置或容器的消费者属性覆盖。
factory.getContainerProperties().setAckMode(AckMode.MANUAL)
; 您有责任承认。(在使用事务时忽略)并且ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
不能是true
.factory.getContainerProperties().setSyncCommits(true/false);
设置是否调用consumer.commitSync()
或commitAsync()
容器何时负责提交。默认为真。这负责与 Kafka 同步,没有别的,如果设置为 true,则该调用将阻塞,直到 Kafka 响应。
其次,消费者 poll() 不会返回相同的记录。对于当前运行的消费者,它使用一些内部索引来跟踪它在内存中的偏移量,我们不必关心提交偏移量。另请参阅@GaryRussell在此处的解释。
简而言之,他解释说:
一旦轮询返回了记录(并且偏移量未提交),它们将不会再次返回,除非您重新启动消费者或对消费者执行 seek() 操作以将偏移量重置为未处理的记录。
你的第二个问题:
Spring Kafka 有缓存吗?如果是的话,如果我在没有提交的情况下在缓存中获得 100 万条记录,这可能是个问题。
没有“缓存”,都是关于偏移量和提交的,解释如上。
现在要实现您想要做的事情,您可以考虑在获取前 50 条记录后做 2 件事,即下一个 poll():
- 要么,以编程方式重新启动容器
- 或致电
consumer.seek(partition, offset);
奖励:
无论您选择什么配置,您都可以通过查看此输出的列来查看结果:LAG
kafka-consumer-groups.bat --bootstrap-server localhost:9091 --describe --group your_group_name
推荐阅读
- python - 定义范围内素数的函数
- julia - 如何在 Julia 中重新计算向量的 eltype
- angular - Angular Observable 过滤
- arrays - 如何在正文视图中使用计数器来填充列表
- logstash - Logstash 中多个 End 事件的经过时间
- python - Django / Python - 查询计算字段
- python-3.x - 使用 keras 的多项式回归
- python - 不使用科学计数法打印
- php - 在 bash 脚本中使用 CURL 请求触发 aws php sdk 浏览器 URL
- html - 使用 flexbox 在 div 旁边水平对齐内容