spring-boot - 在 kafka 消费者中使用 SeekToCurrentErrorHandler 进行无限重试
问题描述
我已经使用 spring-kafka 在 Spring boot 应用程序中配置了一个带有 SeekToCurrentErrorHandler 的 kafka 使用者。我的消费者配置是:
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkaserver");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, StringDeserializer.class.getName());
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "java.lang.String");
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "java.lang.String");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(5);
seekToCurrentErrorHandler.setCommitRecovered(true);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(seekToCurrentErrorHandler);
return factory;
}
为了测试 SeekToCurrentErrorHandler 配置,我在 kafka 中推送了一条格式不正确的记录,因此它因反序列化异常而失败。根据我的理解,错误处理程序应该尝试处理失败的记录 5 次,然后它应该记录并移动到下一条记录。但它会无限次地读取失败的记录。
请告诉我哪里出错了。
解决方案
我有完全相同的问题,唯一的解决方法是确保并发级别与主题的分区数相同。否则它将继续无限重试。
听起来像弹簧卡夫卡的错误。
推荐阅读
- asp.net - 如何在 NET Core MVC 中提交不转到新页面的表单?(ajax 除外)
- javascript - 测试对象属性大致相等
- android - Flutter:如何将 Firebase 云消息传递与关闭的应用程序的本地通知集成?
- php - 使用 php 编辑 JSON 解码的键值
- video-streaming - RTSP vs HLS vs WebRTC vs Dash (proper use cases)
- attributes - Magento 2多选自定义产品属性选项未显示
- marklogic - 使用 MarkLogic /v1/values/{name} REST API 进行构面值搜索?
- scala - Akka Stream 的 Keep right/left/两者如何导致不同的输出?
- vue.js - laravel 5.8 中的 Vue,从 axios 响应动态填充表
- css - 在 ReactJS 中使用 Animate.css 和 react-animated-css