首页 > 解决方案 > spring kafka max.poll.interval.ms、max.poll.records 和 idleTimeBetweenPolls 的重新平衡问题

问题描述

我看到我的应用程序在不断地重新平衡。我的应用程序是以批处理模式开发的,这里是已添加的配置属性。

myapp.consumer.group.id= cg-id-local
myapp.changefeed.topic= test_topic
myapp.auto.offset.reset=latest
myapp.enable.auto.commit=false
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 20000
myapp.idle.time.between.polls=240000
myapp.concurrency = 10

集装箱工厂:

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory(poSummaryCGID));
        factory.setConcurrency(poSummNoOfConsumers);
        factory.setBatchListener(true);
        factory.setAckDiscarded(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setIdleBetweenPolls(idleTimeBetweenPolls);

我这里有几个问题:

  1. 我设置了每次轮询(4 分钟)的最大记录数为 20000,并且我们在一个 TOPIC 中有 10 个分区。由于我将并发设置为 10,因此 10 个消费者将启动并运行,每个消费者将监听 1 个分区。我的问题是,记录数是否会像每个消费者可以处理 2000 条记录一样分配给所有消费者?

  2. max.poll.interval.ms 已设置为 5 分钟。我确信消费者将在给定的轮询间隔(4 分钟)内处理 2000 条(如果我的上述理解是正确的)记录,该间隔小于具有上限的 max.poll.interval.ms。但不确定为什么会发生再平衡?我还需要设置其他配置属性吗?

帮助将不胜感激!

Tried with these configurations: 

myapp.max.poll.interval.ms=600000 
myapp.max.poll.records= 2000 
myapp.idle.time.between.polls=360000 

myapp.max.poll.interval.ms=300000 
myapp.max.poll.records= 2000 
myapp.idle.time.between.polls=300000 

myapp.max.poll.interval.ms=300000 
myapp.max.poll.records= 2000 
myapp.idle.time.between.polls=180000

编辑修复:我们应该始终 myapp.max.poll.interval.ms >(myapp.idle.time.between.polls + myapp.max.poll.records 处理时间)。

标签: spring-kafka

解决方案


编号max.poll.records是每个消费者,而不是每个主题或容器。

如果您有 concurrency=10 和 10 个分区,您应该减少max.poll.records到 2000,以便每个消费者每次轮询最多获得 2000。

容器将自动减少轮询之间的空闲时间,以便max.poll.interval.ms不会超过,但您应该对这些属性 (max.poll.recordsmax.poll.interval.ms) 保持保守,这样永远不可能超过间隔。


推荐阅读