首页 > 解决方案 > 如何根据某些条件处理 Akka 流?

问题描述

假设有一些文件流要处理,并且在满足条件时只应处理(消耗)特定文件。

即只有当流包含一个名为“aaa”的文件时,才处理一个名为“bbb”的文件

SomeFile(name: String)

这样做的正确(推荐)方法是什么?

标签: scalaakkaakka-stream

解决方案


好的,这是一个例子。小心在触发触发之前在这里建立太大的缓冲区

class FileFinder {

  def matchFiles(triggerName: String,
                 matchName: String): Flow[SomeFile, SomeFile, NotUsed] =
    Flow[SomeFile].statefulMapConcat(
      statefulMatcher(matches(triggerName), matches(matchName)))

  private def matches(matchName: String): SomeFile => Boolean = {
    case SomeFile(name) if name == matchName => true
    case _                                   => false
  }

  private def statefulMatcher(
      triggerFilter: => SomeFile => Boolean,
      sendFilter: SomeFile => Boolean): () => SomeFile => List[SomeFile] = {
    var found = false
    var sendFiles: List[SomeFile] = Nil
    () => file: SomeFile =>
      {
        file match {
          case f if triggerFilter(f) =>
            found = true
            val send = sendFiles
            sendFiles = Nil
            send
          case f if sendFilter(f) =>
            if (found)
              List(f)
            else {
              sendFiles = f :: sendFiles
              Nil
            }
          case _ => Nil
        }
      }
  }
}

object FileFinder extends FileFinder {
  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("finder")
    implicit val materializer: ActorMaterializer = ActorMaterializer()
    implicit val executor: ExecutionContextExecutor =
      materializer.executionContext
    implicit val loggingAdapter: LoggingAdapter = system.log

    val files = List(SomeFile("aaa"), SomeFile("bbb"), SomeFile("aaa"))
    Source(files)
      .via(matchFiles("bbb", "aaa"))
      .runForeach(println(_))
      .andThen({
        case Success(_) =>
          println("Success")
          system.terminate()
        case Failure(ex) =>
          loggingAdapter.error("Shouldn't happen...", ex)
          system.terminate()
      })
  }

}

case class SomeFile(name: String)

推荐阅读