scala - 如何根据某些条件处理 Akka 流?
问题描述
假设有一些文件流要处理,并且在满足条件时只应处理(消耗)特定文件。
即只有当流包含一个名为“aaa”的文件时,才处理一个名为“bbb”的文件
SomeFile(name: String)
这样做的正确(推荐)方法是什么?
解决方案
好的,这是一个例子。小心在触发触发之前在这里建立太大的缓冲区
class FileFinder {
def matchFiles(triggerName: String,
matchName: String): Flow[SomeFile, SomeFile, NotUsed] =
Flow[SomeFile].statefulMapConcat(
statefulMatcher(matches(triggerName), matches(matchName)))
private def matches(matchName: String): SomeFile => Boolean = {
case SomeFile(name) if name == matchName => true
case _ => false
}
private def statefulMatcher(
triggerFilter: => SomeFile => Boolean,
sendFilter: SomeFile => Boolean): () => SomeFile => List[SomeFile] = {
var found = false
var sendFiles: List[SomeFile] = Nil
() => file: SomeFile =>
{
file match {
case f if triggerFilter(f) =>
found = true
val send = sendFiles
sendFiles = Nil
send
case f if sendFilter(f) =>
if (found)
List(f)
else {
sendFiles = f :: sendFiles
Nil
}
case _ => Nil
}
}
}
}
object FileFinder extends FileFinder {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("finder")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executor: ExecutionContextExecutor =
materializer.executionContext
implicit val loggingAdapter: LoggingAdapter = system.log
val files = List(SomeFile("aaa"), SomeFile("bbb"), SomeFile("aaa"))
Source(files)
.via(matchFiles("bbb", "aaa"))
.runForeach(println(_))
.andThen({
case Success(_) =>
println("Success")
system.terminate()
case Failure(ex) =>
loggingAdapter.error("Shouldn't happen...", ex)
system.terminate()
})
}
}
case class SomeFile(name: String)
推荐阅读
- php - 获取与 PHP 函数的距离
- fullpage.js - 防止向下滑动以对 Fullpage js 中的鼠标滚轮滚动执行一些其他操作
- ios - 如何在第一个和最后一个colllectionview单元格中提供前导和尾随空间
- javascript -
不支持在使用 componentWillMount 或 componentWillUnMount 时动态更改 `store` - c# - 使用 NServiceBus 时如何防止“System.Transactions.TransactionException”错误
- comments - 在 textwrangler 中阻止评论 Haskell 代码
- java - 材质日历视图高度换行问题
- android - 通过在 Android/iOS 中使用 GeoCoding API 减少 Google 地方信息的使用
- python - 为什么我不能在 Tornado 中同时请求?
- tcl - 获取已编译 tcl 的配置标志