首页 > 解决方案 > 你如何处理 Akka Flow 中的 futures 和 mapAsync?

问题描述

我构建了一个定义简单流程的 akka 图 DSL。但是流 f4 需要 3 秒来发送一个元素,而流 f2 需要 10 秒。

结果,我得到了:3、2、3、2。但是,这不是我想要的。由于 f2 需要太多时间,我想得到:3、3、2、2。这是代码......

implicit val actorSystem = ActorSystem("NumberSystem")
implicit val materializer = ActorMaterializer()

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(List(1, 1))
  val out = Sink.foreach(println)

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))



  val yourMapper: Int => Future[Int] = (i: Int) => Future(i + 1)
  val yourMapper2: Int => Future[Int] = (i: Int) => Future(i + 2)

  val f1, f3 = Flow[Int]
  val f2= Flow[Int].throttle(1, 10.second, 0, ThrottleMode.Shaping).mapAsync[Int](2)(yourMapper)
  val f4= Flow[Int].throttle(1, 3.second, 0, ThrottleMode.Shaping).mapAsync[Int](2)(yourMapper2)

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
  bcast ~> f4 ~> merge
  ClosedShape
})
g.run()

那么我哪里错了?未来还是 mapAsync ?否则...谢谢

标签: scalaconcurrencyparallel-processingakkaakka-stream

解决方案


对不起,我是akka的新手,所以我还在学习。要获得预期的结果,一种方法是放置 async :

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(List(1, 1))
  val out = Sink.foreach(println)

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))



  val yourMapper: Int => Future[Int] = (i: Int) => Future(i + 1)
  val yourMapper2: Int => Future[Int] = (i: Int) => Future(i + 2)

  val f1, f3 = Flow[Int]
  val f2= Flow[Int].throttle(1, 10.second, 0, ThrottleMode.Shaping).map(_+1)
    //.mapAsyncUnordered[Int](2)(yourMapper)
  val f4= Flow[Int].throttle(1, 3.second, 0, ThrottleMode.Shaping).map(_+2)
    //.mapAsync[Int](2)(yourMapper2)

  in ~> f1 ~> bcast ~> f2.async ~> merge ~> f3 ~> out
  bcast ~> f4.async ~> merge
  ClosedShape
})
g.run()

推荐阅读