scala - 尽管 maxRetries < 5 且 > 0,但 Akka RestartSource 会永远重试
问题描述
我试图找出为什么我在这里得到一个无限循环:
object TestCase {
implicit val ec = ExecutionContext.global
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val executionContext: ExecutionContext = actorSystem.dispatcher
def main(args: Array[String]): Unit = {
val finiteSource = Source(1 to 5).mapAsync(2) {
i =>
if(i == 2) {
Future {
Thread.sleep(50)
i
throw new RuntimeException()
}
} else Future.successful(i)
}
val tt = Source(1 to 5).flatMapConcat { i =>
println("--------------------------->"+ i)
finiteSource
}
val forever: Source[Int, NotUsed] = RestartSource.onFailuresWithBackoff(
minBackoff = Duration(10, MILLISECONDS),
maxBackoff= Duration(10, MILLISECONDS),
randomFactor= 0.1,
maxRestarts= 2)(() => tt)
val tt1 = forever.runWith(Sink.foreach(println))
println(Await.result(tt1, Duration.Inf ))
}
}
如果我运行这个,我会得到无休止的重试。但是一旦我将最小 + 最大退避更改为大于 50 毫秒的“请求时间”的值,最大重试逻辑就会起作用。
我找到错误了吗?这种行为对我来说没有意义。为什么我需要提前知道我的请求需要多长时间才能避免无限循环?
解决方案
从文档中RestartSource.onFailuresWithBackoff
maxRestarts
:在minBackoff
. 传递 0 将导致不重新启动,负数不会限制重新启动的数量
Akka Streams 2.6.10 引入RestartSettings
了允许设置maxRestartsWithin
持续时间的功能。
请参阅这个 github 问题建议RestartSettings
:一旦 a 中的源RestartSource
运行了minBackoff
,它实际上是不朽的,因为其背后的意图RestartSource
是它用于将很快失败的源。
推荐阅读
- php - ffmpeg:通过 php xampp (mac) 运行 ffmpeg 时找不到
- salesforce - 如何使用同一个连接的应用程序访问多个 Salesforce 组织
- php - Php 没有错误消息,但插入的数据没有出现在 phpmyadmin 的表中
- python - TensorFlow 图像处理错误:“TypeError:'MapDataset' 对象不可下标”
- python - 如何将我的 pythonOperator(ShortCircuitOperator) 中的值传递给我的 postgresOperator 到 Airflow 中的红移查询
- dask - 在 SageMath 中运行时使用 Dask 会引发 ImportError
- javascript - 如何在Javascript中用旧数组制作新数组
- c# - 如何在不知道匹配返回的 json 数据中的所有值的情况下从 .Net 核心中的 Web api 调用创建模型?
- rdcomclient - 安装 R (D)COM Server V1.35 并与 Excel 集成时出现“找不到连接器”错误
- python - 为什么类方法找不到对象初始化中定义的属性?