java - Spring Kafka Consumer 如何从 Avro Deserializer 异常中跳过
问题描述
我正在使用 Spring Kafka 消费者和 Avro 模式来构建我的应用程序。
但是,如果消息无法反序列化到我构建的指定 Avro 特定记录,则消费者将不断重试消费者相同的消息(无限重试)。
对于这种情况,如果我的消费者发生反序列化程序异常,我如何配置消费者应用程序以跳过当前消息并移动到下一个偏移量。
我查看了 Spring Kafka 错误句柄,它只能在侦听器中处理异常,而不是在反序列化阶段。
我的消费者应用程序非常简单:
@KafkaListener(id = "demo-consumer-stream-group", topics = "customer-output-")
public void process(ConsumerRecord<String, Customer> record) {
LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
}
基于此代码,有时接收到的消息可能不会反序列化为正确的Customer
对象。
另外,我看到最近的一个解决方案是使用ErrorHandlingDeserializer2
Spring Kafka 来处理这个问题,但是由于我使用的KafkaAvroDeserializer
是我如何计算出这些配置?我目前的配置是:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
解决方案
您需要将当前值 + 键解串器都设置为 ErrorHandlingDeserializer.class 并将当前值 + 键解串器设置为 ErrorHandlingDeserializer Key/Value 解串器属性
这看起来类似于:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
推荐阅读
- javascript - 通过类名获取输入字段的值,其中类名是变量
- java - 片段android的缓慢创建
- .net - Azure 应用服务和.NET Framework 4.8 版本
- c# - 集合运算(补和差)
- mysql - 在MYSQL中使用另一个表的属性值作为另一个表的属性的默认值
- apache-kafka - Kafka Connect Protobuf 配置
- intershop - 使用 ISSLOT 标签获取插槽项目的位置
- python - 如何获取按另一列分组的列出现的百分比?Python
- gitlab - 在gitlab上创建合并请求时如何添加标签?
- spring-boot - Kubernetes - SpringBoot - Neo4j - 启动时出错