首页 > 解决方案 > 在spring kafka中使用SeekToCurrentErrorHandler时如何设置重试间隔时间

问题描述

我在 spring-boot 应用程序中使用 SeekToCurrentErrorHandler 处理容器侦听器错误。如果发生异常,我想设置重试间隔,它应该等待一段时间并重试,直到最大尝试次数。

我尝试通过设置备份策略来添加 RetryTemplate。但它并没有例外。当错误发生最大尝试时间时,它将调用 SeekToCurrentErrorHandler 2 次。

@Bean
    public RetryPolicy retryPolicy() {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
        return simpleRetryPolicy;
    }


    @Bean
    public BackOffPolicy backOffPolicy() {
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(retryInterval);
        return backOffPolicy;
    }


    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());
        return retryTemplate;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
            ChainedKafkaTransactionManager<String, String> chainedTM) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setSyncCommits(true);
    // ------->>     factory.setRetryTemplate(retryTemplate());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(chainedTM);
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
        // handle errors 
        }, retryMaxAttempts);
        factory.setErrorHandler(errorHandler);
        log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
        return factory;
    }


使用 SeekToCurrentErrorHandler 时如何设置重试间隔时间?

标签: spring-bootapache-kafkaspring-kafka

解决方案


这是下一个版本 (2.3) 中的新功能

但是,您可以使用重试模板执行此操作,但总尝试次数将是这两个属性的倍数。


推荐阅读