java - 在 Kafka Stream 中处理消息时发生错误时重新处理消息
问题描述
我有一个基于 Kafka Streams 的简单 Spring 应用程序,它使用来自传入主题的消息,进行map
转换并打印此消息。KStream
像这样配置
@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
PrintAction printAction, String topicName) {
KStream<String, JsonNode> source = builder.stream(topicName,
Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
// @formatter:off
source
.map(myTransformer)
.foreach(printAction);
// @formatter:on
return source;
}
在里面MyTransformer
我正在调用外部微服务,此时它可能会关闭。如果调用失败(通常是 throwing RuntimeException
),我无法进行转换。
问题是如果在之前的处理过程中发生任何错误,有什么方法可以再次重新处理 Streams 应用程序中的消息?
根据我目前的研究,这里没有办法这样做,我唯一的可能是将消息推送到死信主题中,并在将来再次失败时尝试处理它我再次将其推送到 DLT 并以这种方式重试.
解决方案
如果在 Kafka Streams 处理期间发生任何未捕获的异常,您的流将状态更改为 ERROR 并停止消费发生错误的分区的传入消息。您需要自己捕获异常。重试可以通过以下方式实现:1)使用 SpringRetryTemplate
调用外部微服务(但请记住,您将延迟使用来自特定分区的消息),或 2)将失败的消息推送到另一个主题以供以后重新处理(如您所建议的)
更新自kafka-streams
2.8.0
因为,您可以使用方法自动替换kafka-streams
2.8.0
失败的流线程(由未捕获的异常引起)。有关更多详细信息,请查看Kafka Streams Specific Uncaught Exception HandlerKafkaStreams
void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);
StreamThreadExceptionResponse.REPLACE_THREAD
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
推荐阅读
- php - 使用 echo 问题设置 PHP 样式
- docker - 从 github 操作运行时 Docker 守护程序访问被拒绝
- android - 从 Android 项目中删除 Kotlin 扩展后自定义对话框出现问题
- vue.js - 在 vuex 状态改变后更新所有客户端
- flutter - 如何将变量作为参数传递给小部件二,在那里对其进行修改,并将修改后的值返回给小部件一,Flutter
- python - Pandas:查找excel表中是否存在列名?
- angular - 等待数据恢复结束后再阅读更多
- sql - 如何使用窗口函数计算当前周/当年与同一周数/去年的指标?
- google-chrome - Chrome.identity.launchWebAuthFlow 可能与 gapi 存在 CORS 问题(谷歌登录)
- reactjs - 添加去抖动后数据已停止返回