scala - 如何在 Akka 流中合并任意数量的源?
问题描述
我有n
想在 Akka 流中按优先级合并的源。我的实现基于GraphMergePrioritiziedSpec,其中合并了三个优先源。我试图Source
用以下方法抽象出 s 的数量:
import akka.NotUsed
import akka.stream.{ClosedShape, Graph, Materializer}
import akka.stream.scaladsl.{GraphDSL, MergePrioritized, RunnableGraph, Sink, Source}
import org.apache.activemq.ActiveMQConnectionFactory
class SourceMerger(
sources: Seq[Source[java.io.Serializable, NotUsed]],
priorities: Seq[Int],
private val sink: Sink[java.io.Serializable, _]
) {
require(sources.size == priorities.size, "Each source should have a priority")
import GraphDSL.Implicits._
private def partial(
sources: Seq[Source[java.io.Serializable, NotUsed]],
priorities: Seq[Int],
sink: Sink[java.io.Serializable, _]
): Graph[ClosedShape, NotUsed] = GraphDSL.create() { implicit b =>
val merge = b.add(MergePrioritized[java.io.Serializable](priorities))
sources.zipWithIndex.foreach { case (s, i) =>
s.shape.out ~> merge.in(i)
}
merge.out ~> sink
ClosedShape
}
def merge(
sources: Seq[Source[java.io.Serializable, NotUsed]],
priorities: Seq[Int],
sink: Sink[java.io.Serializable, _]
): RunnableGraph[NotUsed] = RunnableGraph.fromGraph(partial(sources, priorities, sink))
def run()(implicit mat: Materializer): NotUsed = merge(sources, priorities, sink).run()(mat)
}
但是,运行以下存根时出现错误:
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Sink, Source}
import org.scalatest.{Matchers, WordSpecLike}
import akka.testkit.TestKit
import scala.collection.immutable.Iterable
class SourceMergerSpec extends TestKit(ActorSystem("SourceMerger")) with WordSpecLike with Matchers {
implicit val materializer: Materializer = ActorMaterializer()
"A SourceMerger" should {
"merge by priority" in {
val priorities: Seq[Int] = Seq(1,2,3)
val highPriority = Iterable("message1", "message2", "message3")
val mediumPriority = Iterable("message4", "message5", "message6")
val lowPriority = Iterable("message7", "message8", "message9")
val source1 = Source[String](highPriority)
val source2 = Source[String](mediumPriority)
val source3 = Source[String](lowPriority)
val sources = Seq(source1, source2, source3)
val subscriber = Sink.seq[java.io.Serializable]
val merger = new SourceMerger(sources, priorities, subscriber)
merger.run()
source1.runWith(Sink.foreach(println))
}
}
}
相关的堆栈跟踪在这里:
[StatefulMapConcat.out] is already connected
java.lang.IllegalArgumentException: [StatefulMapConcat.out] is already connected
at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1304)
at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase$class.$tilde$greater(Graph.scala:1431)
at akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1521)
at SourceMerger$$anonfun$partial$1$$anonfun$apply$1.apply(SourceMerger.scala:26)
at SourceMerger$$anonfun$partial$1$$anonfun$apply$1.apply(SourceMerger.scala:25)
似乎错误来自于此:
sources.zipWithIndex.foreach { case (s, i) =>
s.shape.out ~> merge.in(i)
}
Source
是否可以在 Akka 流 Graph DSL 中合并任意数量的s?如果是这样,为什么我的尝试没有成功?
解决方案
代码示例的主要问题
问题中提供的代码片段的一个大问题source1
是连接到Sink
来自merge
调用和Sink.foreach(println)
. 如果没有中间扇出元素Source
,则无法将其连接到多个接收器。
删除Sink.foreach(println)
可能会彻底解决您的问题。
简化设计
Source
基于来自特定的所有消息具有相同优先级的事实,可以简化合并。这意味着您可以按各自的优先级对源进行排序,然后将它们连接在一起:
private def partial(sources: Seq[Source[java.io.Serializable, NotUsed]],
priorities: Seq[Int],
sink: Sink[java.io.Serializable, _]): RunnableGraph[NotUsed] =
sources.zip(priorities)
.sortWith(_._2 < _._2)
.map(_._1)
.reduceOption(_ ++ _)
.getOrElse(Source.empty[java.io.Serializable])
.to(sink)
推荐阅读
- reactjs - 将 React 类组件转换为函数组件
- java - 如何从android中的改造入队方法更新进度条
- angular - Angular - 导航到新页面后重置不活动重定向
- mysql - 在三叉树中插入节点
- jenkins - 将按钮添加到将触发 Jenkins 作业的 Github 推送请求
- javascript - VueJS:过滤选择列表
- java - 原因:mapbox 依赖项添加期间的 java.lang.RuntimeException
- osgi - 如何使用 java 和远程 karaf 容器以编程方式部署、启动、停止 OSGI 包?
- javascript - Vuetify v-file-input 仅带有图标
- python - 将 Python 程序转换为 C/C++,然后转换为 RISC-V 可执行文件