首页 > 解决方案 > 在 Kafka Stream 中处理消息时发生错误时重新处理消息

问题描述

我有一个基于 Kafka Streams 的简单 Spring 应用程序,它使用来自传入主题的消息,进行map转换并打印此消息。KStream像这样配置

@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
         PrintAction printAction, String topicName) {
    KStream<String, JsonNode> source = builder.stream(topicName,
                Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
    // @formatter:off
    source
        .map(myTransformer)
        .foreach(printAction);
    // @formatter:on
    return source;
}

在里面MyTransformer我正在调用外部微服务,此时它可能会关闭。如果调用失败(通常是 throwing RuntimeException),我无法进行转换。

问题是如果在之前的处理过程中发生任何错误,有什么方法可以再次重新处理 Streams 应用程序中的消息?

根据我目前的研究,这里没有办法这样做,我唯一的可能是将消息推送到死信主题中,并在将来再次失败时尝试处理它我再次将其推送到 DLT 并以这种方式重试.

标签: javaapache-kafkaapache-kafka-streamsspring-kafka

解决方案


如果在 Kafka Streams 处理期间发生任何未捕获的异常,您的流将状态更改为 ERROR 并停止消费发生错误的分区的传入消息。您需要自己捕获异常。重试可以通过以下方式实现:1)使用 SpringRetryTemplate调用外部微服务(但请记住,您将延迟使用来自特定分区的消息),或 2)将失败的消息推送到另一个主题以供以后重新处理(如您所建议的)


更新自kafka-streams 2.8.0

因为,您可以使用方法自动替换kafka-streams 2.8.0失败的流线程(由未捕获的异常引起)。有关更多详细信息,请查看Kafka Streams Specific Uncaught Exception HandlerKafkaStreamsvoid setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREAD

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

推荐阅读