首页 > 解决方案 > Akka Streams Unzip/Zip 是否保留顺序?

问题描述

如果我解压缩一系列元组,对两个流执行一些异步突变,然后重新压缩它们,Akka 是否保证流以相同的顺序重新压缩?

例子:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, Unzip, Zip}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}

val graph: Flow[(Int, String), (Int, String), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val unzip = builder.add(Unzip[Int, String])
  val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
  val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
  val zip = builder.add(Zip[Int, String])

  unzip.out0 ~> increment ~> zip.in0
  unzip.out1 ~> append ~> zip.in1

  FlowShape(unzip.in, zip.out)
})

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val out = Source(collection.immutable.Seq((0, "a"), (1, "b"), (2, "c")))
  .via(graph)
  .runWith(Sink.seq)

Await.result(out, 1 second)

在这个简单的测试中,输出是Vector((1,a-x), (2,b-x), (3,c-x))。所以事情看起来不错。但我不确定我是否可以相信这将永远如此。

引起一些关注的是:

val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val filter = builder.add(Flow[Int].filter(_ != 2))
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])

unzip.out0 ~> increment ~> filter ~> zip.in0
unzip.out1 ~> append ~> zip.in1

// output: Vector((1,a-x), (3,b-x))

即使保留了顺序,也不能保证会保留原始元组关系。

我可以手动检查我的流程以确保没有过滤逻辑。但是完成后,我可以确定元组会按照收到的顺序重新压缩吗?

标签: scalaakkaakka-stream

解决方案


TL;DR 是的,确实如此。来自Akka 中的流订购文档:

在 Akka Streams 中,几乎所有计算运算符都保留元素的输入顺序。这意味着如果输入{IA1,IA2,...,IAn}“原因”输出{OA1,OA2,...,OAk}和输入{IB1,IB2,...,IBm}“原因”输出{OB1,OB2,...,OBl}并且所有都IAi发生在所有之前,IBi那么都OAi发生在之前OBi

这个属性甚至被async诸如 之类的操作所支持mapAsync,但是存在一个无序版本mapAsyncUnordered,它不保留这个顺序。

然而,在处理多个输入流(例如Merge)的连接的情况下,输出顺序通常没有为到达不同输入端口的元素定义。这是一个类似合并的操作,可能会在发出Ai之前发出Bi,并且由其内部逻辑决定发出元素的顺序。然而,诸如此类的特殊元素Zip确实保证了它们的输出顺序,因为每个输出元素都依赖于已经发出信号的所有上游元素——因此在压缩的情况下的顺序是由这个属性定义的。

如果您发现自己需要在扇入场景中对发射元素的顺序进行细粒度控制,请考虑使用MergePreferred,MergePrioritizedGraphStage– 这使您可以完全控制合并的执行方式。


推荐阅读