scala - 你如何处理 Akka Flow 中的 futures 和 mapAsync?
问题描述
我构建了一个定义简单流程的 akka 图 DSL。但是流 f4 需要 3 秒来发送一个元素,而流 f2 需要 10 秒。
结果,我得到了:3、2、3、2。但是,这不是我想要的。由于 f2 需要太多时间,我想得到:3、3、2、2。这是代码......
implicit val actorSystem = ActorSystem("NumberSystem")
implicit val materializer = ActorMaterializer()
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(List(1, 1))
val out = Sink.foreach(println)
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val yourMapper: Int => Future[Int] = (i: Int) => Future(i + 1)
val yourMapper2: Int => Future[Int] = (i: Int) => Future(i + 2)
val f1, f3 = Flow[Int]
val f2= Flow[Int].throttle(1, 10.second, 0, ThrottleMode.Shaping).mapAsync[Int](2)(yourMapper)
val f4= Flow[Int].throttle(1, 3.second, 0, ThrottleMode.Shaping).mapAsync[Int](2)(yourMapper2)
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
ClosedShape
})
g.run()
那么我哪里错了?未来还是 mapAsync ?否则...谢谢
解决方案
对不起,我是akka的新手,所以我还在学习。要获得预期的结果,一种方法是放置 async :
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(List(1, 1))
val out = Sink.foreach(println)
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val yourMapper: Int => Future[Int] = (i: Int) => Future(i + 1)
val yourMapper2: Int => Future[Int] = (i: Int) => Future(i + 2)
val f1, f3 = Flow[Int]
val f2= Flow[Int].throttle(1, 10.second, 0, ThrottleMode.Shaping).map(_+1)
//.mapAsyncUnordered[Int](2)(yourMapper)
val f4= Flow[Int].throttle(1, 3.second, 0, ThrottleMode.Shaping).map(_+2)
//.mapAsync[Int](2)(yourMapper2)
in ~> f1 ~> bcast ~> f2.async ~> merge ~> f3 ~> out
bcast ~> f4.async ~> merge
ClosedShape
})
g.run()
推荐阅读
- python - 在空行之间加入行 - python
- php - 如果使用 PHP 匹配特定条件,则无法从 JSON 数组中删除对象。代码抛出错误
- android - RxJava错误处理 由于缺少异常没有处理
- tensorflow - 为什么我的训练趋势是一条直线?
- spring-cloud-stream - 怎么知道绑定是活的?
- android - OnResponse 不适用于 Volley(从谷歌表格中读取数据)
- java - Selenium Java - 使用 Getter 和 Setter 的 TestNG 数据提供者
- node.js - 无法在 node.js 中获取选择菜单的值
- javascript - wow滑块在angularjs ng-repeat中通过http.get加载图像时不起作用
- javascript - 从一个页面重定向到另一个页面而不在 Routes.js 文件中显示该文件路径