首页 > 解决方案 > 使用折叠的 Akka 流程未完成

问题描述

我有一些大致如下所示的代码:其中 A 是两个 Maps 的元组

def methodName(): Flow[A, B, NotUsed] = {
  val filter = Flow[A].map(a => a._2.slice(0, 2))
  val split = Flow[A._2]
    .mapConcat(identity)
    .map(t => {
      B.random
    })
    .fold(B.empty)((a, b) => {
      new B(a._1, a._2 ++ Seq(b._1), a._3 ++ Seq(b._2), a._4)
    })

  val logK = Flow[B].log("K", c => {
    log.info("here")
  })
  filter.via(split).via(logK)
}

但是当我运行这个时,流在折叠阶段停止,我不明白为什么。我可以确认 A._2 中的集合已完全用尽,当我用不同的操作替换折叠时,流程继续进行并且没有被阻塞。据我所知,上游mapConcat正在调用completeStage。所以我不确定为什么折叠阶段没有接到那个电话并且知道继续下一阶段。

标签: scalaakka-stream

解决方案


所以看来这是我使用的akka​​版本的一个bug:akka: "2.5.23", akkaHttp: "10.1.10"

当我升级到 akka: "2.6.8" 和 akkaHttpV = "10.2.0" 时,一切都按预期工作


推荐阅读