首页 > 解决方案 > "tee" Scala 流/迭代器

问题描述

我有一个表示为简单迭代器(或流)的顺序数据源。数据很大,不适合内存。此外,源可以遍历一次,并且获取成本很高。此源用于一些以迭代器(或流)作为参数以线性消费数据的繁重过程(黑盒)。好的,这很简单。但是,如果我有两个不同的此类消费程序,我该怎么办?正如我所说,我不想将输入数据吸入 List 之类的集合中。我也可以通过从一开始就重新阅读源两次来完成我的任务,但我不喜欢这样,因为它没有效果。如果事实上我需要“tee”(一种克隆)迭代器(或流),以通过两个并行进程消耗单个两次,而不将其缓存到内存集合中。我想如果这种方法消耗源流太快,应该做背压或者更确切地说限制兄弟姐妹。有效的解决方案可能应该有一些并行安全的队列缓冲区。有谁知道如何在 Scala 上进行这样的事情(或使用任何外部流库/框架)?

PS 我发现了一个 4 年前的类似问题: 一个上游流为多个下游流提供数据 不同之处在于我询问如何使用标准 Scala 迭代器(或流)或更好的一些现有库来执行它。

标签: scalastreamtee

解决方案


您应该查看fs2 流。该示例使用常量内存以增量方式读取文件并写入另一个文件。这是您如何修改他们的示例以写入两个文件的方法:

...

io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
  .through(text.utf8Decode)
  .through(text.lines)
  .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
  .map(line => fahrenheitToCelsius(line.toDouble).toString)
  .intersperse("\n")
  .through(text.utf8Encode)
  .observe(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
  .through(io.file.writeAll(Paths.get("testdata/celsius2.txt"), blockingEC))

...

推荐阅读