首页 > 解决方案 > 尽管使用了 RestartSource,但 Alpakka Kafka Stream 在连接错误后没有重新启动

问题描述

我有一个简单的可提交源,用于包装在 RestartSource 中的 Kafka 流。它在愉快的路径中运行良好,但如果我故意严重连接到 Kafka 集群,它会从底层 kafka 客户端引发连接异常并报告 Kafka Consumer Shut Down。我的期望是在约 150 秒后重新启动流,但事实并非如此。从下面我对 RestartSource 的理解/使用是否不正确:

val atomicControl = new AtomicReference[Consumer.Control](NoopControl)
val restartablekafkaSourceWithFlow = {
        RestartSource.withBackoff(30.seconds, 120.seconds, 0.2) {
          () => {
            Consumer.committableSource(consumerSettings.withClientId("clientId"), Subscriptions.topics(Set("someTopic")))
              .mapMaterializedValue(c => atomicControl.set(c))
              .via(someFlow)
              .via(httpFlow)
          }
        }
      }
val committerSink: Sink[(Any, ConsumerMessage.CommittableOffset), Future[Done]] = Committer.sinkWithOffsetContext(CommitterSettings(actorSystem))

val runnableGraph = restartablekafkaSourceWithFlow.toMat(committerSink)(Keep.both)

val control = runnableGraph.mapMaterializedValue(x => Consumer.DrainingControl.apply(atomicControl.get, x._2)).run()

标签: apache-kafkaakkaakka-streamalpakka

解决方案


也许您在RestartSource.

您可以添加recover以查看错误,和/或创建如下所示的决策程序并在runnableGraph.

private val decider: Supervision.Decider = { e =>
    logger.error("Unhandled exception in stream.", e)
    Supervision.Resume
  }

runnableGraph.withAttributes(supervisionStrategy(decider))

推荐阅读