首页 > 解决方案 > 在 kafka 消费者中使用 SeekToCurrentErrorHandler 进行无限重试

问题描述

我已经使用 spring-kafka 在 Spring boot 应用程序中配置了一个带有 SeekToCurrentErrorHandler 的 kafka 使用者。我的消费者配置是:

     @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkaserver");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
    props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
    props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, StringDeserializer.class.getName());
    props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "java.lang.String");
    props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "java.lang.String");

    return new DefaultKafkaConsumerFactory<>(props);
  }


@Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(5);
    seekToCurrentErrorHandler.setCommitRecovered(true);

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(seekToCurrentErrorHandler);

    return factory;
  }

为了测试 SeekToCurrentErrorHandler 配置,我在 kafka 中推送了一条格式不正确的记录,因此它因反序列化异常而失败。根据我的理解,错误处理程序应该尝试处理失败的记录 5 次,然后它应该记录并移动到下一条记录。但它会无限次地读取失败的记录。

请告诉我哪里出错了。

标签: spring-bootapache-kafkakafka-consumer-apispring-kafka

解决方案


我有完全相同的问题,唯一的解决方法是确保并发级别与主题的分区数相同。否则它将继续无限重试。

听起来像弹簧卡夫卡的错误。


推荐阅读