首页 > 解决方案 > 尽管 maxRetries < 5 且 > 0,但 Akka RestartSource 会永远重试

问题描述

我试图找出为什么我在这里得到一个无限循环:

object TestCase {
  implicit val ec = ExecutionContext.global
  implicit val actorSystem: ActorSystem           = ActorSystem()
  implicit val executionContext: ExecutionContext = actorSystem.dispatcher

  def main(args: Array[String]): Unit = {
    val finiteSource = Source(1 to 5).mapAsync(2) {
      i =>
        if(i == 2) {
          Future {
            Thread.sleep(50)
            i
            throw new RuntimeException()
          }
        } else Future.successful(i)




    }
    val tt = Source(1 to 5).flatMapConcat { i  =>
      println("--------------------------->"+ i)
      finiteSource
    }
    val forever: Source[Int, NotUsed] = RestartSource.onFailuresWithBackoff(
      minBackoff = Duration(10, MILLISECONDS),
      maxBackoff= Duration(10, MILLISECONDS),
      randomFactor=  0.1,
      maxRestarts= 2)(() => tt)



    val tt1 = forever.runWith(Sink.foreach(println))



    println(Await.result(tt1, Duration.Inf ))
  }
}

如果我运行这个,我会得到无休止的重试。但是一旦我将最小 + 最大退避更改为大于 50 毫秒的“请求时间”的值,最大重试逻辑就会起作用。

我找到错误了吗?这种行为对我来说没有意义。为什么我需要提前知道我的请求需要多长时间才能避免无限循环?

标签: scalaakka

解决方案


从文档中RestartSource.onFailuresWithBackoff

maxRestarts:在minBackoff. 传递 0 将导致不重新启动,负数不会限制重新启动的数量

Akka Streams 2.6.10 引入RestartSettings了允许设置maxRestartsWithin持续时间的功能。

请参阅这个 github 问题建议RestartSettings:一旦 a 中的源RestartSource运行了minBackoff,它实际上是不朽的,因为其背后的意图RestartSource是它用于将很快失败的源。


推荐阅读