首页 > 解决方案 > 发生错误时如何关闭KafkaListener

问题描述

我用这种方式写了一个 Listener

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@KafkaListener(containerFactory = "cdcKafkaListenerContainerFactory", errorHandler = "errorHandler")
public void consume(@Payload String message) throws Exception {
    ...
}

@Bean
public KafkaListenerErrorHandler errorHandler() {
    return ((message, e) -> {
        kafkaListenerEndpointRegistry.stop();
        return null;
    });
}

@KafkaListener注释中,我指定了简单地停止消费者的错误处理程序。它似乎有效,但我有一些问题要问。

此范围是否有内置的 errorHandler ?我读过ContainerStoppingErrorHandler可以使用,但我无法设置它,因为@KafkaListenererrorHandler 接受KafkaListenerErrorHandler类型的 bean。

我看到了kafkaListenerEndpointRegistry.stop(); 优雅地停下来。因此,在停止之前,已消费消息的分区偏移量被提交。我所知道的是在kafkaListenerEndpointRegistry.stop();被调用时和在监听器被确定关闭之前会发生什么另一条消息到达主题?这条消息被消费了吗?

我想象这个场景

time0: kafkaListenerEndpointRegistry.stop() is called
time1: a message is pushed into the listened topic
time2: kafkaListenerEndpointRegistry.stop() complete graceful stop

我担心消息可能会在 time1 到达。在这种情况下会发生什么?

标签: apache-kafkaspring-kafka

解决方案


不要停止侦听器内的容器。

ContainerStoppingErrorHandler是在容器工厂上设置的,而不是注解。

如果您使用的是 Spring Boot,只需将错误处理程序声明为 bean,boot 就会将其连接进去。

否则将错误处理程序添加到连接工厂 bean。

使用此错误处理程序,抛出异常将立即停止容器。


推荐阅读