scala - Akka Streams Unzip/Zip 是否保留顺序?
问题描述
如果我解压缩一系列元组,对两个流执行一些异步突变,然后重新压缩它们,Akka 是否保证流以相同的顺序重新压缩?
例子:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, Unzip, Zip}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
val graph: Flow[(Int, String), (Int, String), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> zip.in0
unzip.out1 ~> append ~> zip.in1
FlowShape(unzip.in, zip.out)
})
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val out = Source(collection.immutable.Seq((0, "a"), (1, "b"), (2, "c")))
.via(graph)
.runWith(Sink.seq)
Await.result(out, 1 second)
在这个简单的测试中,输出是Vector((1,a-x), (2,b-x), (3,c-x))
。所以事情看起来不错。但我不确定我是否可以相信这将永远如此。
引起一些关注的是:
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val filter = builder.add(Flow[Int].filter(_ != 2))
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> filter ~> zip.in0
unzip.out1 ~> append ~> zip.in1
// output: Vector((1,a-x), (3,b-x))
即使保留了顺序,也不能保证会保留原始元组关系。
我可以手动检查我的流程以确保没有过滤逻辑。但是完成后,我可以确定元组会按照收到的顺序重新压缩吗?
解决方案
TL;DR 是的,确实如此。来自Akka 中的流订购文档:
在 Akka Streams 中,几乎所有计算运算符都保留元素的输入顺序。这意味着如果输入
{IA1,IA2,...,IAn}
“原因”输出{OA1,OA2,...,OAk}
和输入{IB1,IB2,...,IBm}
“原因”输出{OB1,OB2,...,OBl}
并且所有都IAi
发生在所有之前,IBi
那么都OAi
发生在之前OBi
。这个属性甚至被
async
诸如 之类的操作所支持mapAsync
,但是存在一个无序版本mapAsyncUnordered
,它不保留这个顺序。然而,在处理多个输入流(例如
Merge
)的连接的情况下,输出顺序通常没有为到达不同输入端口的元素定义。这是一个类似合并的操作,可能会在发出Ai
之前发出Bi
,并且由其内部逻辑决定发出元素的顺序。然而,诸如此类的特殊元素Zip
确实保证了它们的输出顺序,因为每个输出元素都依赖于已经发出信号的所有上游元素——因此在压缩的情况下的顺序是由这个属性定义的。如果您发现自己需要在扇入场景中对发射元素的顺序进行细粒度控制,请考虑使用
MergePreferred
,MergePrioritized
或GraphStage
– 这使您可以完全控制合并的执行方式。
推荐阅读
- php - 将数组推入数组内部的数组
- cypress - 赛普拉斯:检查元素是否存在无异常
- amazon-web-services - AWS SSE-S3 可以保护您免受哪些攻击媒介?
- android - 如何使用来自 Android Gradle 插件的 Variant API 的新方法 variant.getGenerateBuildConfigProvider()?
- ios - 如何将数据从表视图行操作的标签发送到视图控制器?
- readfile - IOCP ReadFile 总是阻塞直到读取完成
- javascript - 我在 asp.net 中使用了浏览器仿真模式。我收到类似“尚未注册 Flash 电影”的错误。如何解决?
- jquery - css 在 jquery 的打印 div 中丢失
- java - 如何从十六进制转换为十进制以及从十进制转换为二进制
- c# - XAMLServices.Load 到接口