scala - Scala 集合未在简单的 Akka Streams 操作中实现
问题描述
我来自未绑定流源的数据如下所示:
value1,
value3,
...,
START,
value155,
...,
value202,
END,
...,
value234,
value235,
...
START,
value298,
...,
value310,
END,
...,
value377,
...
基于Akka-Streams 收集数据 (Source -> Flow -> Flow (collect) -> Sink),我想出了以下代码,使用 Akka Streams 在固定的“开始键”和“结束键”之间累积消息(这里开始和结束):
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
val list = List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10", "start", "d11", "d12", "d13", "d14", "end", "d15")
val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"; also the cycle of "start" and "end" repeats
implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()
Source(list)
.scan(Seq.empty[String]) { (coll, s) =>
if(s.equals("start") || coll.head.equals("start"))
coll :+ s
else
Seq.empty[String] // return empty Seq unless new element == "start"
// or first element of Seq == "start"
}
.filter(_.last.equals("end"))
.to(Sink.foreach(println)).run()
唉,根本没有任何东西可以通过过滤器!没有输出。
coll.head.equals
将and替换coll.last.equals
为.contains
, 返回一个结果,当然这是不正确的,因为在某些时候总是包含“end”。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
val list = List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10", "start", "d11", "d12", "d13", "d14", "end", "d15")
val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"; also the cycle of "start" and "end" repeats
implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()
Source(list)
.scan(Seq.empty[String]) { (coll, s) =>
if(s.equals("start") || coll.contains("start"))
coll :+ s
else
Seq.empty[String]
}
.filter(_.contains("end"))
.to(Sink.foreach(println)).run()
正如预期的那样,输出是:
List(start, d4, d5, d6, d7, end)
List(start, d4, d5, d6, d7, end, d9)
List(start, d4, d5, d6, d7, end, d9, d10)
关于如何解决这个问题的任何建议?我怀疑在此过程中需要强制执行一些“物化”,否则我可能会遇到一些我不知道的惰性 eval/actor/async 问题。提前致谢!
(在撰写本文时,https: //doc.akka.io/docs/akka/current/stream/stream-quickstart.html 有一个现成的 ScaleFiddle 可以快速使用 Akka Streams)
编辑:
澄清“未绑定” - 我的意思是,消息列表不仅未绑定,而且“开始”和“结束”循环也重复。我已经相应地更新了这个例子。
解决方案
一种方法是使用statefulMapConcat
:
val source =
Source(List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10"))
source.statefulMapConcat { () =>
var started = false
var ended = false
x =>
if (x == "start") {
started = true
Nil
} else if (x == "end") {
ended = true
Nil
} else if (started && !ended) {
x :: Nil
} else {
Nil
}
}.runForeach(println)
上面的代码打印以下内容:
d4
d5
d6
d7
如果您想在“开始”和“结束”之间累积元素,而不是在流式基础上单独打印这些元素,您可以调整上面的代码片段来做到这一点。AccumulateWhileUnchanged
或者,查看Akka Streams Contrib项目。
推荐阅读
- javascript - typeof Array'instance 可以在 JavaScript 中返回 'Array' 吗?
- go - 如何转换结构字段名称中的任何字符串,例如
- r - array_reshape() 没有根据需要重塑数组
- django - 在 Django 中注册自定义用户时出错
- javascript - 在灯箱内容中使用 javascript 函数?
- docker - docker push 时的权限问题
- java - 函数运行两次不知道为什么
- javascript - Firebase 身份验证 - 用户对象仅在首次成功注册后才变为空
- c++ - 抽象类类型“A”的无效新表达式
- maven - 在 gradle 库中包含 JAR 文件的字节码与仅将其添加为依赖项