apache-kafka - 尽管使用了 RestartSource,但 Alpakka Kafka Stream 在连接错误后没有重新启动
问题描述
我有一个简单的可提交源,用于包装在 RestartSource 中的 Kafka 流。它在愉快的路径中运行良好,但如果我故意严重连接到 Kafka 集群,它会从底层 kafka 客户端引发连接异常并报告 Kafka Consumer Shut Down。我的期望是在约 150 秒后重新启动流,但事实并非如此。从下面我对 RestartSource 的理解/使用是否不正确:
val atomicControl = new AtomicReference[Consumer.Control](NoopControl)
val restartablekafkaSourceWithFlow = {
RestartSource.withBackoff(30.seconds, 120.seconds, 0.2) {
() => {
Consumer.committableSource(consumerSettings.withClientId("clientId"), Subscriptions.topics(Set("someTopic")))
.mapMaterializedValue(c => atomicControl.set(c))
.via(someFlow)
.via(httpFlow)
}
}
}
val committerSink: Sink[(Any, ConsumerMessage.CommittableOffset), Future[Done]] = Committer.sinkWithOffsetContext(CommitterSettings(actorSystem))
val runnableGraph = restartablekafkaSourceWithFlow.toMat(committerSink)(Keep.both)
val control = runnableGraph.mapMaterializedValue(x => Consumer.DrainingControl.apply(atomicControl.get, x._2)).run()
解决方案
也许您在RestartSource
.
您可以添加recover
以查看错误,和/或创建如下所示的决策程序并在runnableGraph
.
private val decider: Supervision.Decider = { e =>
logger.error("Unhandled exception in stream.", e)
Supervision.Resume
}
runnableGraph.withAttributes(supervisionStrategy(decider))
推荐阅读
- javascript - 使用 javascript 从 html 输入中添加元素并向 html 添加/删除行
- python - 根据其他数据框中的值检索一行数据框中的值
- python-3.x - 在不同设备上的 2 个 python 程序之间传输数据
- java - 如何从 ConcurrentHashMap 中删除用作值的 Set
>? - macos - 每次我使用并行时我都必须下载debian吗?
- java - 为什么不会打印数组的正确中位数?
- python - 检测图像中网络链接周围的网络创建矩形
- javascript - 多次调用 React 组件
- android - 自定义项目布局气泡像 Messenger 一样聊天
- javascript - 如何在无状态 ScrollViewRTL 组件中使用 ref?