首页 > 解决方案 > Spring Cloud Stream Kafka - 如何在批处理模式下处理异常

问题描述

对于我的卡夫卡批量消费者,我想处理两种类型的情况。

  1. 在侦听器收到记录之前发生的反序列化异常。重试这些没什么意义,直接发送到DLQ。

  2. 侦听器内发生的异常。当我对批处理应用一些流操作(过滤和分组等)时,很难确切知道批处理中的哪条记录失败。所以重试整个批次,如果达到重试限制,将整个批次发送到 DLQ,对我来说更简单也可以接受。


反馈后编辑

我已经能够成功地使用 ListenerContainerCustomizer 来解决方案 2。

现在我仍然需要处理场景 1(反序列化异常)。我尝试过使用 failedDeserializationFunction,如此处所述https://docs.spring.io/spring-kafka/docs/2.5.12.RELEASE/reference/html/#error-handling-deserializer

@Getter
public class BadRequestEvent extends RequestEvent {

    private final FailedDeserializationInfo failedDeserializationInfo;

    public BadRequestEvent(FailedDeserializationInfo failedDeserializationInfo) {
        this.failedDeserializationInfo = failedDeserializationInfo;
    }

}

public class FailedRequestEventProvider implements Function<FailedDeserializationInfo, RequestEvent> {

    @Override
    public RequestEvent apply(FailedDeserializationInfo info) {
        return new BadRequestEvent(info);
    }

}

配置

spring:
  kafka.consumer.properties:
        value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
        key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.IntegerDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.deserializer.value.function: com.consumer.domain.event.FailedRequestEventProvider
        spring.json.value.default.type: com.consumer.domain.event.RequestEvent
 cloud.stream:
  kafka:
    binder:
      brokers:
       - broker1:9092
       - broker2:9092
       - broker3:9092
       - broker4:9092
  function:
    definition: receive
  bindings:
    receive-in-0:
      destination: RequestEvent
      group: requestConsumer
      consumer:
        batch-mode: true

我可以在消费者中看到 BadRequestEvent 正在正确构建并且确实包含 FailedDeserializationInfo。但是当它最终被发送到监听器时,它被序列化回 RequestEvent 并且信息丢失了。

好像我在这里遗漏了一些东西。我的印象是听众会收到 BadRequestEvents 和 RequestEvents 的列表?然后可以根据需要处理这些

标签: spring-bootspring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


Spring Cloud Stream 不使用ConcurrentKafkaListenerContainerFactoryfrom Boot;要配置错误处理程序,您需要添加一个ListenerContainerCustomizer<AbstractKafkaListenerContainerFactory>bean。

最好的解决方案是RecoveringBatchErrorHandler,您可以在其中抛出一个BatchListenerFailedException告诉错误处理程序哪个记录失败。

还有RetryingBatchErrorHandler将重试整个批次,并可选择将其发送到死信主题。

在任何一种情况下,DeadLetterPublishingRecoverer在错误处理程序中使用 a 并且不要在活页夹中启用 DLQ。


推荐阅读