首页 > 解决方案 > 当并发级别小于分区号时,Spring Kafka SeekToCurrentErrorHandler maxFailures 不起作用

问题描述

我正在使用 spring kafka 2.2.7,我的消费者配置代码如下:

@Slf4j
@Configuration
@EnableKafka
public class KafkaConfiguration {

  @Bean
  ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // less than number of partition, will do infinite retry
    factory.setConcurrency(1);
    SeekToCurrentErrorHandler errorHandler =
        new SeekToCurrentErrorHandler((record, exception) -> {
          LOGGER.info("***in error handler data, {}", record);
        }, 1);
    factory.setErrorHandler(errorHandler);
    return factory;
  }

  @Bean
  public ConsumerFactory<String, Customer> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
    props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);

    props.put("schema.registry.url", "http://127.0.0.1:8081");
    props.put("specific.avro.reader", "true");

    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    return props;
  }
}


@Component
@Slf4j
public class KafkaConsumerService {

  @KafkaListener(id = "demo-consumer-stream-group", topics = "kafka-demo-avro")
  public void process(ConsumerRecord<String, Customer> record) {
    LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
    LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
    throw new RuntimeException("force to retry");
  }
}

因此,如果我的侦听器中发生异常,如果我指定的并发级别小于我的主题的分区计数,即使我在配置中配置了 maxFailures,消费者也会永远重试失败的消息。

仅当我至少每隔一秒钟发送一条消息时,它才会起作用。如果我将消息作为批处理发送,则该行为将不起作用。除非我重新启动消费者并且它将正常工作。

重现步骤: 1. 创建一个具有多个分区的主题,例如 3 或 6 2. 在 Spring kafka 配置中,将并发级别指定为 1 3. 对于SeekToCurrentErrorHandler,指定maxFailure为正值,例如 3 4. 发送十几个消息到主题

您将看到每条失败的消息都会进行无限重试,而不是maxFailure我指定的。此外,我可以看到很多消息落后于消费者滞后。

但是,如果您停止侦听器并再次启动侦听器,它将正确跳过失败的消息。

标签: apache-kafkaspring-kafka

解决方案


这是 Spring Kafka 的错误,2.2.7.RELEASE但已在2.2.8.RELEASE.


推荐阅读