首页 > 解决方案 > 卡夫卡消费者长时间闲置后不消费

问题描述

嗨,我有 kafka 消费者消费数据。下面的命令给了我消费者组命令超时。

kafka-consumer-groups.sh --bootstrap-server b1:9092,b2:9092,b3:9092,b4:9092,b5:9092,b6:9092,b7:9092,b8:9092,b9:9092,b10:9092,b11:9092,b12:9092,b13:9092 --describe --group testgroup

错误:执行消费者组命令失败,原因是消费者组命令在等待组初始化时超时:

所有消费者已经使用数据超过 26 小时。由于生产者在这 6 小时内不再生成数据,因此存在超过 6 小时的差距。

我怀疑有一些空闲时间可能断开了消费者组与消费者之间的连接。所有消费者都在以 100ms 的轮询间隔进行消费poll(100)

这种情况已经被观察了 3 次以上。感谢 Kafka 专家的任何帮助。谢谢。

代码:

@Service
public class DedupeConsumerService {

    final Logger logger = LoggerFactory.getLogger(DedupeConsumerService.class);

    @Autowired
    private TaskExecutor taskExecutor;

    @Autowired
    private PropertyConfig config;

    @Autowired
    private ApplicationContext applicationContext;

    public void consume() {

        String topic = config.getDedupServiceConsumerTopic();
        String consGroup = config.getDedupServiceConsGroup();

        Properties props = new Properties();
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "20000");
        props.put("max.poll.records", "10000");

        KafkaConsumer<String, AvroSyslogMessage> consumer = new GenericConsumer<String, AvroSyslogMessage>().initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.AVRODESER, props);

        logger.info("Dedupe Kafka Consumer Initialized......");

        try {
            while (true) {
                ConsumerRecords<String, AvroSyslogMessage> records = consumer.poll(100);
                if (records.count() > 0) {

                    }


                    logger.info("Number of Records:: " + records.count() + " Time took to process poll :: " + durationInMilliSec);

                }
            }

        } catch (Throwable e) {
            logger.error("Error occured while processing message", e);
            e.printStackTrace();
        } finally {
            logger.debug("dedupe kafka consume is closing");
            consumer.close();
        }

    }

}

我试着安排投票时间,因为Integer.MAX_VALUE这没有帮助。

标签: apache-kafkakafka-consumer-api

解决方案


考虑为您的消费者配置心跳设置:https ://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_heartbeat.interval.ms

Heartbeat 将确保连接不会自动关闭(很像保持活动状态)。


推荐阅读