apache-kafka - 当并发级别小于分区号时,Spring Kafka SeekToCurrentErrorHandler maxFailures 不起作用
问题描述
我正在使用 spring kafka 2.2.7,我的消费者配置代码如下:
@Slf4j
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Bean
ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// less than number of partition, will do infinite retry
factory.setConcurrency(1);
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
LOGGER.info("***in error handler data, {}", record);
}, 1);
factory.setErrorHandler(errorHandler);
return factory;
}
@Bean
public ConsumerFactory<String, Customer> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
props.put("schema.registry.url", "http://127.0.0.1:8081");
props.put("specific.avro.reader", "true");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return props;
}
}
@Component
@Slf4j
public class KafkaConsumerService {
@KafkaListener(id = "demo-consumer-stream-group", topics = "kafka-demo-avro")
public void process(ConsumerRecord<String, Customer> record) {
LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
throw new RuntimeException("force to retry");
}
}
因此,如果我的侦听器中发生异常,如果我指定的并发级别小于我的主题的分区计数,即使我在配置中配置了 maxFailures,消费者也会永远重试失败的消息。
仅当我至少每隔一秒钟发送一条消息时,它才会起作用。如果我将消息作为批处理发送,则该行为将不起作用。除非我重新启动消费者并且它将正常工作。
重现步骤: 1. 创建一个具有多个分区的主题,例如 3 或 6 2. 在 Spring kafka 配置中,将并发级别指定为 1 3. 对于SeekToCurrentErrorHandler
,指定maxFailure
为正值,例如 3 4. 发送十几个消息到主题
您将看到每条失败的消息都会进行无限重试,而不是maxFailure
我指定的。此外,我可以看到很多消息落后于消费者滞后。
但是,如果您停止侦听器并再次启动侦听器,它将正确跳过失败的消息。
解决方案
这是 Spring Kafka 的错误,2.2.7.RELEASE
但已在2.2.8.RELEASE
.
推荐阅读
- database - 使用数据库/sql 原始查询和 go 通道填充项目的惯用方式
- javascript - 使用 Apify 的 Puppetetteer Crawler 抓取网站页面
- django - Django:显示每个用户的个人资料
- javascript - React 中的类与 JSX 对象
- javascript - Idea 认为在 Promise.all() 中初始化的变量未初始化
- vue.js - 如何更新 Nuxt.js 中的 $auth.user 属性?
- selenium-webdriver - 有时点击提交后 WebDriver 切换到 defaultContext 而不是当前帧
- informatica-powercenter - 如何使用 informatica 中的 SQL 转换在表中运行插入参数化值
- .net-core-3.0 - Text.Json 自定义 JsonConverter 被忽略
- javascript - 如何让JS信息转到php生成文本文件?