首页 > 解决方案 > Spring Kafka Consumer 如何从 Avro Deserializer 异常中跳过

问题描述

我正在使用 Spring Kafka 消费者和 Avro 模式来构建我的应用程序。

但是,如果消息无法反序列化到我构建的指定 Avro 特定记录,则消费者将不断重试消费者相同的消息(无限重试)。

对于这种情况,如果我的消费者发生反序列化程序异常,我如何配置消费者应用程序以跳过当前消息并移动到下一个偏移量。

我查看了 Spring Kafka 错误句柄,它只能在侦听器中处理异常,而不是在反序列化阶段。

我的消费者应用程序非常简单:

@KafkaListener(id = "demo-consumer-stream-group", topics = "customer-output-")
  public void process(ConsumerRecord<String, Customer> record) {
    LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
    LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
  }

基于此代码,有时接收到的消息可能不会反序列化为正确的Customer对象。

另外,我看到最近的一个解决方案是使用ErrorHandlingDeserializer2Spring Kafka 来处理这个问题,但是由于我使用的KafkaAvroDeserializer是我如何计算出这些配置?我目前的配置是:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

标签: javaspringapache-kafkaspring-kafka

解决方案


您需要将当前值 + 键解串器都设置为 ErrorHandlingDeserializer.class 并将当前值 + 键解串器设置为 ErrorHandlingDeserializer Key/Value 解串器属性

这看起来类似于:

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);

推荐阅读