apache-kafka - 如何重试 KStream 消息?
问题描述
我的任务是为 Kafka KStreams 处理器中收到的消息实现重试机制。拓扑是这样的(更改名称/方法以保护无辜者):
KStream<String, GenericRecord> inputStream = kStreamBuilder.stream(inputTopic,Consumed.with(Serdes.String(), getSerDe()));
return inputStream
.peek((key,value) -> metricsService.startTimer())
.map((key, value) -> KeyValue.pair(new MessageKey(key), mapInputMessageToInternalMessage(key, value)))
.mapValues(tokenizationService::detokenize)
.mapValues(outputMapper::mapInternalMessageToOutputMessage)
.peek((key, value) -> loggingUtil.startTransactionLog(value.getInputMessage()))
.mapValues(someOtherProcessor::writeSomewhereElse)
.filter(this::filterStream)
.mapValues(this::toOutputMessageWrapper)
;
我意识到我遗漏了很多,但总的来说,如果它在中间的某个地方中断,我需要能够“重试”这个流程。例如,detokenize
是一个外部 REST API,可能会遇到暂时性错误。如果是这样,我希望能够以指数退避计划重试,希望它成功。
今天,在每个回调中都有检查,mapValues
以查看上一步是否发生错误,如果是,它们基本上就失败了。在这些情况下,toOutputMessageWrapper
将阻止消息发送到输出主题。那时我需要做的是从顶部重试整个事情。
我目前看到的唯一可能的方法(我远非 Kafka/KakfaStreams 专家)是在 KStream 的每个阶段(在各种mapValues
方法中,在这种情况下)实现重试机制,而不是尝试“开始超过”。
还有其他建议吗?
解决方案
推荐阅读
- azure-functions - Azure 服务总线主题的 EventGridTrigger
- android - cordova-plugin-firebase 中的电话号码无效
- reactjs - 延迟加载 react-router-dom,webpack 不工作
- appium - Appium 问题:找不到 aapt 请使用 Android SDK 根目录路径设置 ANDROID_HOME 环境变量
- azure - 我们可以将 logicapps 参数分布在多个文件中吗?
- laravel - 从 eloquent 模型的相关表中获取格式化属性
- python - 如何避免“IndexError:列表索引超出范围”错误?
- python - 将 ID 值分配给共享多个特征的 obs
- rest - 用于移动应用程序 iOS 和 Android 的 REST API 身份验证
- python - 如何实现 Pygame 定时游戏循环?