首页 > 解决方案 > Scala 集合未在简单的 Akka Streams 操作中实现

问题描述

我来自未绑定流源的数据如下所示:

value1, 
value3, 
..., 
START, 
value155, 
..., 
value202, 
END, 
...,
value234,
value235, 
...
START, 
value298, 
..., 
value310, 
END, 
...,
value377, 
...

基于Akka-Streams 收集数据 (Source -> Flow -> Flow (collect) -> Sink),我想出了以下代码,使用 Akka Streams 在固定的“开始键”和“结束键”之间累积消息(这里开始和结束):

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

val list = List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10", "start", "d11", "d12", "d13", "d14", "end", "d15")
val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"; also the cycle of "start" and "end" repeats 

implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()

Source(list)
  .scan(Seq.empty[String]) { (coll, s) => 
    if(s.equals("start") || coll.head.equals("start")) 
      coll :+ s
    else
      Seq.empty[String] // return empty Seq unless new element == "start" 
                        // or first element of Seq == "start" 
  }    
  .filter(_.last.equals("end"))
  .to(Sink.foreach(println)).run()

唉,根本没有任何东西可以通过过滤器!没有输出。

coll.head.equals将and替换coll.last.equals.contains, 返回一个结果,当然这是不正确的,因为在某些时候总是包含“end”。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

val list = List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10", "start", "d11", "d12", "d13", "d14", "end", "d15")
val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"; also the cycle of "start" and "end" repeats 

implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()

Source(list)
  .scan(Seq.empty[String]) { (coll, s) => 
    if(s.equals("start") || coll.contains("start")) 
      coll :+ s
    else
      Seq.empty[String]
  }    
  .filter(_.contains("end"))
  .to(Sink.foreach(println)).run()

正如预期的那样,输出是:

List(start, d4, d5, d6, d7, end)
List(start, d4, d5, d6, d7, end, d9)
List(start, d4, d5, d6, d7, end, d9, d10)

关于如何解决这个问题的任何建议?我怀疑在此过程中需要强制执行一些“物化”,否则我可能会遇到一些我不知道的惰性 eval/actor/async 问题。提前致谢!

(在撰写本文时,https: //doc.akka.io/docs/akka/current/stream/stream-quickstart.html 有一个现成的 ScaleFiddle 可以快速使用 Akka Streams)

编辑:

澄清“未绑定” - 我的意思是,消息列表不仅未绑定,而且“开始”和“结束”循环也重复。我已经相应地更新了这个例子。

标签: scalaakkaakka-stream

解决方案


一种方法是使用statefulMapConcat

val source =
  Source(List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10"))

source.statefulMapConcat { () =>
  var started = false
  var ended = false

  x =>
    if (x == "start") {
      started = true
      Nil
    } else if (x == "end") {
      ended = true
      Nil
    } else if (started && !ended) {
      x :: Nil
    } else {
      Nil
    }
}.runForeach(println)

上面的代码打印以下内容:

d4
d5
d6
d7

如果您想在“开始”和“结束”之间累积元素,而不是在流式基础上单独打印这些元素,您可以调整上面的代码片段来做到这一点。AccumulateWhileUnchanged或者,查看Akka Streams Contrib项目。


推荐阅读