首页 > 解决方案 > 在 Spring Kafka 中找不到处理恢复的正确方法

问题描述

我正在开发一个相当简单的 Spring Boot 应用程序,它将处理来自单个主题的消息,然后为每条消息调用一个外部 Web 服务。我希望这个服务对错误有点聪明,例如,如果外部网站在短时间内不可用,则应该使用指数退避重试记录,直到我们放弃并只记录错误并提交记录。

我正在使用 Spring Boot 和 Spring Kafka 2.3.3。

我将向您展示我为此设置所拥有的一些 Spring 配置。为简洁起见省略了一些内容。询问是否有一些价值或其他配置可能会有所帮助。

@Configuration
@EnableKafka
public class SpringConfiguration {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
        // .. Misc other properties related to serialisation etc ..
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public SeekToCurrentErrorHandler eh() {
        long initialMillis = 500;
        long factor = 2;
        long maxElapsedTimeSecs = 60;
        ExponentialBackOff backoff = new ExponentialBackOff(initialMillis, factor);
        backoff.setMaxElapsedTime(maxElapsedTimeSecs*1000);

        BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, exc) ->  {
           // TODO In the final app do something more useful here
           logger.error("* Maximum retry policy has been reached {} - acknowledging and proceeding *", rec);
        };

        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, backoff);
        eh.setCommitRecovered(true);
        return eh;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setMissingTopicsFatal(missingTopicsFatal); // True in prod, false otherwise

 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(60));
        factory.setStatefulRetry(true);
        factory.setErrorHandler(eh());
        return factory;
    }
}

我的听众很简单:

@KafkaListener(topics = "${kafka.input_topic}")
public void handle(ConsumerRecord<String, SendToEBoksMessage> record, Acknowledgment acknowledgment) throws Exception {
    logger.info("Listener invoked");
    // TODO Right so simulate some sort of problem. External web service not available, for example.
    throw new Exception("Exception of some kind");
}

但似乎该ExponentialBackoff参数引入的延迟增加SeekToCurrentErrorHandler导致 Kafka 发生重新平衡。重试几次后,日志显示发生了这种重新平衡:

...
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void MyListenerClass.handle(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, MyMessageClass>,org.springframework.kafka.support.Acknowledgment) throws java.lang.Exception' threw exception; nested exception is java.lang.Exception: Exception of some kind; nested exception is java.lang.Exception: Exception of some kind
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1686)
    ... 10 more
Caused by: java.lang.Exception: Exception of some kind
    at MyListenerClass.handle(SendToEboksMessageKafkaListener.java:20)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

2019-12-16 12:49:04.364  INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator          : [Consumer clientId=consumer-1, groupId=mygroupid] Revoking previously assigned partitions [MyTopic-0]
2019-12-16 12:49:04.365  INFO michael-laptop --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : MyTopic: partitions revoked: [MyTopic-0]
2019-12-16 12:49:04.365  INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator          : [Consumer clientId=consumer-1, groupId=mygroupid] (Re-)joining group
2019-12-16 12:49:04.373  INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator          : [Consumer clientId=consumer-1, groupId=mygroupid] Successfully joined group with generation 18
2019-12-16 12:49:04.373  INFO michael-laptop --- [ntainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator          : [Consumer clientId=consumer-1, groupId=mygroupid] Setting newly assigned partitions: 
2019-12-16 12:49:04.373  INFO michael-laptop --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : ...

我不明白为什么会这样。根据我的阅读,以这种方式使用错误处理程序进行恢复将导致重试由容器处理,并避免由于调用consumer.poll()频率不足以满足max.poll.ms属性的潜在问题。

谁能告诉我我在这里做错了什么?

*** 更新:

我在 Kafka 代理日志中看到以下内容:

[2019-12-17 14:13:22,714] INFO [GroupCoordinator 1001]: Preparing to rebalance group MyGroup1 in state PreparingRebalance with old generation 0 (__consumer_offsets-37) (reason: Adding new member consumer-1-2d76a488-3677-4294-9aed-c153f0dca66c with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:13:22,722] INFO [GroupCoordinator 1001]: Stabilized group MyGroup1 generation 1 (__consumer_offsets-37) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:13:22,735] INFO [GroupCoordinator 1001]: Assignment received from leader for group MyGroup1 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:18,096] INFO [GroupCoordinator 1001]: Preparing to rebalance group MyGroup1 in state PreparingRebalance with old generation 1 (__consumer_offsets-37) (reason: Adding new member consumer-1-addbdcfd-21ed-44fa-9d17-b10c7c67f07f with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:20,161] INFO [GroupCoordinator 1001]: Stabilized group MyGroup1 generation 2 (__consumer_offsets-37) (kafka.coordinator.group.GroupCoordinator)
[2019-12-17 14:14:20,163] INFO [GroupCoordinator 1001]: Assignment received from leader for group MyGroup1 for generation 2 (kafka.coordinator.group.GroupCoordinator)

标签: javaspring-bootspring-kafka

解决方案


这是我前段时间写的一个应用程序;我将其更新为 Boot 2.2.2 并且工作正常:

@SpringBootApplication
public class Kgh1234Application {

    public static void main(String[] args) {
        SpringApplication.run(Kgh1234Application.class, args);
    }


    @KafkaListener(id = "kgh1234", topics = "kgh1234")
    public void listen(String in) {
        System.out.println(in);
        if (in.endsWith("5")) {
            throw new RuntimeException("fail");
        }
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 2L)));
        return factory;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("kgh1234", 32, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("kgh1234", "foo" + i));
        };
    }

}

spring.kafka.consumer.auto-offset-reset=earliest

它只使用默认的恢复器,它只是在重试用尽时记录。

foo5

2019-12-17 10:50:32.018 错误 32052 --- [kgh1234-0-C-1] osklSeekToCurrentErrorHandler:Backoff FixedBackOff{interval=0, currentAttempts=3, maxAttempts=2} 已耗尽 ConsumerRecord(主题 = kgh1234,分区= 1, leaderEpoch = 0, offset = 0, CreateTime = 1576597830940, 序列化键大小 = -1, 序列化值大小 = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo5)


推荐阅读