首页 > 解决方案 > 如何重试 KStream 消息?

问题描述

我的任务是为 Kafka KStreams 处理器中收到的消息实现重试机制。拓扑是这样的(更改名称/方法以保护无辜者):

KStream<String, GenericRecord> inputStream = kStreamBuilder.stream(inputTopic,Consumed.with(Serdes.String(), getSerDe()));
        return inputStream
                .peek((key,value) -> metricsService.startTimer())
                .map((key, value) -> KeyValue.pair(new MessageKey(key), mapInputMessageToInternalMessage(key, value)))
                .mapValues(tokenizationService::detokenize)
                .mapValues(outputMapper::mapInternalMessageToOutputMessage)
                .peek((key, value) -> loggingUtil.startTransactionLog(value.getInputMessage()))
                .mapValues(someOtherProcessor::writeSomewhereElse)
                .filter(this::filterStream)
                .mapValues(this::toOutputMessageWrapper)
                ;

我意识到我遗漏了很多,但总的来说,如果它在中间的某个地方中断,我需要能够“重试”这个流程。例如,detokenize是一个外部 REST API,可能会遇到暂时性错误。如果是这样,我希望能够以指数退避计划重试,希望它成功。

今天,在每个回调中都有检查,mapValues以查看上一步是否发生错误,如果是,它们基本上就失败了。在这些情况下,toOutputMessageWrapper将阻止消息发送到输出主题。那时我需要做的是从顶部重试整个事情。

我目前看到的唯一可能的方法(我远非 Kafka/KakfaStreams 专家)是在 KStream 的每个阶段(在各种mapValues方法中,在这种情况下)实现重试机制,而不是尝试“开始超过”。

还有其他建议吗?

标签: apache-kafkaapache-kafka-streams

解决方案


推荐阅读