首页 > 解决方案 > 如何根据 Kafka SeekToCurrentErrorHandler 中的异常类型设置更改重试次数?

问题描述

我有一个 Spring Boot 应用程序,我正在尝试创建一个 Kafka 重试侦听器,其中根据异常类型我需要更改重试尝试。

例如:

如果异常类型为 A,则重试次数应为 5
如果异常类型为 B,则重试次数应为 10

谁能推荐如何在 Spring Kafka 中做到这一点?

以下是我的 ListenerFactory。我正在使用 SeekToCurrentErrorHandler

Bean
    public ConcurrentKafkaListenerContainerFactory<String, test> retryListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, test> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            System.out.println(
                    "RetryPolicy** limit has been exceeded! You should really handle this better." + record.key());
        }, new FixedBackOff(0L, 3L));
        errorHandler.addNotRetryableException(IllegalArgumentException.class);
        errorHandler.setCommitRecovered(true);
        factory.setErrorHandler(errorHandler);
        // to keep the consumers alive the failure gets reported to broker so that
        // consumers remain alive
        factory.setStatefulRetry(true);
        factory.setConcurrency(2);
        return factory;
    }

标签: javaspring-bootapache-kafkakafka-consumer-apispring-kafka

解决方案


从 2.6 版开始,您可以添加一个函数来BackOff根据消费者记录和/或异常来确定要使用的函数:

/**
 * Set a function to dynamically determine the {@link BackOff} to use, based on the
 * consumer record and/or exception. If null is returned, the default BackOff will be
 * used.
 * @param backOffFunction the function.
 * @since 2.6
 */
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
    this.failureTracker.setBackOffFunction(backOffFunction);
}

仅当此记录没有当前记录时才调用此方法BackOff(即,它对您的其他问题没有帮助)。


推荐阅读