apache-kafka - 发生错误时如何关闭KafkaListener
问题描述
我用这种方式写了一个 Listener
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@KafkaListener(containerFactory = "cdcKafkaListenerContainerFactory", errorHandler = "errorHandler")
public void consume(@Payload String message) throws Exception {
...
}
@Bean
public KafkaListenerErrorHandler errorHandler() {
return ((message, e) -> {
kafkaListenerEndpointRegistry.stop();
return null;
});
}
在@KafkaListener
注释中,我指定了简单地停止消费者的错误处理程序。它似乎有效,但我有一些问题要问。
此范围是否有内置的 errorHandler ?我读过ContainerStoppingErrorHandler
可以使用,但我无法设置它,因为@KafkaListener
errorHandler 接受KafkaListenerErrorHandler
类型的 bean。
我看到了kafkaListenerEndpointRegistry.stop()
; 优雅地停下来。因此,在停止之前,已消费消息的分区偏移量被提交。我所知道的是在kafkaListenerEndpointRegistry.stop();
被调用时和在监听器被确定关闭之前会发生什么另一条消息到达主题?这条消息被消费了吗?
我想象这个场景
time0: kafkaListenerEndpointRegistry.stop() is called
time1: a message is pushed into the listened topic
time2: kafkaListenerEndpointRegistry.stop() complete graceful stop
我担心消息可能会在 time1 到达。在这种情况下会发生什么?
解决方案
不要停止侦听器内的容器。
ContainerStoppingErrorHandler
是在容器工厂上设置的,而不是注解。
如果您使用的是 Spring Boot,只需将错误处理程序声明为 bean,boot 就会将其连接进去。
否则将错误处理程序添加到连接工厂 bean。
使用此错误处理程序,抛出异常将立即停止容器。
推荐阅读
- r - 传单和闪亮的 html 小部件错误呈现
- javascript - 如何在 angularjs 中获取子元素
- http - 如何在 HTTPie 中使用 PUT 上传文件
- php - 使用 PHPExcel 库将 Html 表导出到 Excel 文件时出现乱码
- java - 想要在 spring rest API 中发送文件以及其他一些值
- hyperledger-fabric - 在安装 generator-hyperledger-composer 后的 ubntu 中。generator-hyperledger-composer:找不到命令
- spring - 仅使用 spring boot 依赖注入
- json-deserialization - 虽然反序列化来自api“https://jsonplaceholder.typicode.com/posts”的json字符串
- c# - 如何在 xamarin ios 中自动调整 UITableView 的行高
- rest - 邮递员将不正确的整数值传递给 API 控制器