首页 > 解决方案 > Spring Kafka Consumer Retry 具有长时间的退避间隔,给出“org.apache.kafka.clients.consumer.CommitFailedException”

问题描述

我是 Spring-Kafka 的新手,并尝试在使用 Spring Kafka RetryTemplate 处理 kafka 消息期间发生故障或任何异常时实现重试。

我使用了以下代码:

//这是KafkaListenerContainerFactory:

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryRetry() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRetryTemplate(retryTemplate());
    factory.setRecoveryCallback(retryContext -> {
        ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
        logger.info("Recovery is called for message {} ", consumerRecord.value());
        return Optional.empty();
    });
    return factory;
}

// 重试模板

public RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    // Todo: take from config
    fixedBackOffPolicy.setBackOffPeriod(240000);// 240seconds
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
    // Todo: take from config
    simpleRetryPolicy.setMaxAttempts(3);
    retryTemplate.setRetryPolicy(simpleRetryPolicy);
    return retryTemplate;
}
    
//

这是消费者工厂

public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

发生任何异常时,将按照重试策略按预期重试。一次,max retries用尽,它调用recovery回调方法。 但在那之后不久,它给出了“java.lang.IllegalStateException:这个错误处理程序无法处理'org.apache.kafka.clients.consumer.CommitFailedException's;没有可用的记录信息”,其中包含一些细节,例如: Failing OffsetCommit request since the consumer is不属于活动组。

似乎无法提交偏移量,因为消费者现在已从组中退出,因为它在下一次轮询之前长时间空闲(backoffperiod *(maxretry-1))。

我需要添加一些大值的 max.poll.interval.ms 吗?

有没有其他方法可以实现这一点,这样即使消费者在处理过程中花费了这么多时间并且计划以较长的时间间隔重试,也不会出现此提交失败错误。

请帮助我。

标签: spring-kafkaretrytemplate

解决方案


总的 backOff 延迟必须小于max.poll.interval.ms以避免重新平衡。

现在首选使用 aSeekToCurrentErrorHandler而不是 a,RetryTemplate因为这样只有每个延迟(而不是聚合)需要小于max.poll.interval.ms

文档在这里


推荐阅读