spring-kafka - spring kafka max.poll.interval.ms、max.poll.records 和 idleTimeBetweenPolls 的重新平衡问题
问题描述
我看到我的应用程序在不断地重新平衡。我的应用程序是以批处理模式开发的,这里是已添加的配置属性。
myapp.consumer.group.id= cg-id-local
myapp.changefeed.topic= test_topic
myapp.auto.offset.reset=latest
myapp.enable.auto.commit=false
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 20000
myapp.idle.time.between.polls=240000
myapp.concurrency = 10
集装箱工厂:
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(poSummaryCGID));
factory.setConcurrency(poSummNoOfConsumers);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setIdleBetweenPolls(idleTimeBetweenPolls);
我这里有几个问题:
我设置了每次轮询(4 分钟)的最大记录数为 20000,并且我们在一个 TOPIC 中有 10 个分区。由于我将并发设置为 10,因此 10 个消费者将启动并运行,每个消费者将监听 1 个分区。我的问题是,记录数是否会像每个消费者可以处理 2000 条记录一样分配给所有消费者?
max.poll.interval.ms 已设置为 5 分钟。我确信消费者将在给定的轮询间隔(4 分钟)内处理 2000 条(如果我的上述理解是正确的)记录,该间隔小于具有上限的 max.poll.interval.ms。但不确定为什么会发生再平衡?我还需要设置其他配置属性吗?
帮助将不胜感激!
Tried with these configurations:
myapp.max.poll.interval.ms=600000
myapp.max.poll.records= 2000
myapp.idle.time.between.polls=360000
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 2000
myapp.idle.time.between.polls=300000
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 2000
myapp.idle.time.between.polls=180000
编辑修复:我们应该始终 myapp.max.poll.interval.ms >(myapp.idle.time.between.polls + myapp.max.poll.records 处理时间)。
解决方案
编号max.poll.records
是每个消费者,而不是每个主题或容器。
如果您有 concurrency=10 和 10 个分区,您应该减少max.poll.records
到 2000,以便每个消费者每次轮询最多获得 2000。
容器将自动减少轮询之间的空闲时间,以便max.poll.interval.ms
不会超过,但您应该对这些属性 (max.poll.records
和max.poll.interval.ms
) 保持保守,这样永远不可能超过间隔。
推荐阅读
- python - 为什么我的班级没有在我的单元测试中被嘲笑?
- javascript - Chart.js 使用静态数据的图表未呈现,可能是由于 Vue
- dataframe - 在 SPARK Sql/Pyspark 中将变量值作为列名传递?
- python - 如何修复 CNN 的“值错误”
- django - 如何在 django admin 中链接(连接)两个下拉菜单
- c# - 如何排除对新子类的访问,同时仍允许从另一个程序集进行测试?
- r - 有效地定义多个特定日期
- javascript - Jquery setinterval 刷新的 ajax 函数打破了下拉列表
- arrays - 从角度数组中选择嵌套数据
- nginx - Minikube Nginx Ingress 找不到服务端点