spring-boot - Spring Cloud Stream Kafka - 如何在批处理模式下处理异常
问题描述
对于我的卡夫卡批量消费者,我想处理两种类型的情况。
在侦听器收到记录之前发生的反序列化异常。重试这些没什么意义,直接发送到DLQ。
侦听器内发生的异常。当我对批处理应用一些流操作(过滤和分组等)时,很难确切知道批处理中的哪条记录失败。所以重试整个批次,如果达到重试限制,将整个批次发送到 DLQ,对我来说更简单也可以接受。
反馈后编辑
我已经能够成功地使用 ListenerContainerCustomizer 来解决方案 2。
现在我仍然需要处理场景 1(反序列化异常)。我尝试过使用 failedDeserializationFunction,如此处所述https://docs.spring.io/spring-kafka/docs/2.5.12.RELEASE/reference/html/#error-handling-deserializer
@Getter
public class BadRequestEvent extends RequestEvent {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadRequestEvent(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
}
public class FailedRequestEventProvider implements Function<FailedDeserializationInfo, RequestEvent> {
@Override
public RequestEvent apply(FailedDeserializationInfo info) {
return new BadRequestEvent(info);
}
}
配置
spring:
kafka.consumer.properties:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.IntegerDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.deserializer.value.function: com.consumer.domain.event.FailedRequestEventProvider
spring.json.value.default.type: com.consumer.domain.event.RequestEvent
cloud.stream:
kafka:
binder:
brokers:
- broker1:9092
- broker2:9092
- broker3:9092
- broker4:9092
function:
definition: receive
bindings:
receive-in-0:
destination: RequestEvent
group: requestConsumer
consumer:
batch-mode: true
我可以在消费者中看到 BadRequestEvent 正在正确构建并且确实包含 FailedDeserializationInfo。但是当它最终被发送到监听器时,它被序列化回 RequestEvent 并且信息丢失了。
好像我在这里遗漏了一些东西。我的印象是听众会收到 BadRequestEvents 和 RequestEvents 的列表?然后可以根据需要处理这些
解决方案
Spring Cloud Stream 不使用ConcurrentKafkaListenerContainerFactory
from Boot;要配置错误处理程序,您需要添加一个ListenerContainerCustomizer<AbstractKafkaListenerContainerFactory>
bean。
最好的解决方案是RecoveringBatchErrorHandler,您可以在其中抛出一个BatchListenerFailedException
告诉错误处理程序哪个记录失败。
还有RetryingBatchErrorHandler将重试整个批次,并可选择将其发送到死信主题。
在任何一种情况下,DeadLetterPublishingRecoverer
在错误处理程序中使用 a 并且不要在活页夹中启用 DLQ。
推荐阅读
- python - 如何使用 python 代码从二进制文件中读取和提取值?
- python - 关闭emacs中显示大色块的提示
- python - 通过 python 文件到 Robot 框架的命令行参数
- jquery - 在表格主体上方的表格标题中显示选项
- django - 登录期间导致 csrf_token 问题的 Django CustomMiddleware
- java - 如何在微调器上选择项目?
- bash - 如何在不清除屏幕的情况下刷新多行变量输出
- scrapy - 带有 Scrapy 的 JSONDecodeError:预期值:第 1 行第 1 列(字符 0)
- vue.js - 如何在与表单无关的元素上使用 v-model?
- c# - 此项目不支持脚手架 Visual Studio 2019