scala - "tee" Scala 流/迭代器
问题描述
我有一个表示为简单迭代器(或流)的顺序数据源。数据很大,不适合内存。此外,源可以遍历一次,并且获取成本很高。此源用于一些以迭代器(或流)作为参数以线性消费数据的繁重过程(黑盒)。好的,这很简单。但是,如果我有两个不同的此类消费程序,我该怎么办?正如我所说,我不想将输入数据吸入 List 之类的集合中。我也可以通过从一开始就重新阅读源两次来完成我的任务,但我不喜欢这样,因为它没有效果。如果事实上我需要“tee”(一种克隆)迭代器(或流),以通过两个并行进程消耗单个两次,而不将其缓存到内存集合中。我想如果这种方法消耗源流太快,应该做背压或者更确切地说限制兄弟姐妹。有效的解决方案可能应该有一些并行安全的队列缓冲区。有谁知道如何在 Scala 上进行这样的事情(或使用任何外部流库/框架)?
PS 我发现了一个 4 年前的类似问题: 一个上游流为多个下游流提供数据 不同之处在于我询问如何使用标准 Scala 迭代器(或流)或更好的一些现有库来执行它。
解决方案
您应该查看fs2 流。该示例使用常量内存以增量方式读取文件并写入另一个文件。这是您如何修改他们的示例以写入两个文件的方法:
...
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.observe(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
.through(io.file.writeAll(Paths.get("testdata/celsius2.txt"), blockingEC))
...
推荐阅读
- python - 如何替换列表中的特定值?
- javascript - Formik 在有效字段更改后调度 Redux 操作
- deep-learning - 我可以在进行深度学习时使用来自不同来源的数据集吗
- php - 未捕获的 PHP 异常 Symfony\Component\HttpKernel\Exception\NotFoundHttpException:“没有为“GET /”找到路由
- c++ - 带有 std::tuple_cat 的模板实例化 decltype 和 declval
- javascript - 使用来自 JSON 对象的数据填充具有多条线的 Chart.Js 折线图
- javascript - 使用 Typescript 在 Webpack 编译期间动态导入模块
- r - 计算 R 中具有特定名称模式的列的行均值
- github - 如何获取已合并到 master 的 pull request 列表?
- javascript - 如何在firebase web中获得新添加的孩子?