scala - FS2 中的块文件轮换
问题描述
我有一大串字符串。现在我想将这些块写入文件。在每个文件中我想写 N 个块。我写了下面的代码来解决这个问题。但我的方法肯定不是惯用的。
object Diff extends App{
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val l: Stream[Pure, (Chunk[String], Int)] =
Stream("Jakob", "Leo", "Tom", "Anton", "Lavinia", "Daniel")
.intersperse("\r\n")
.chunkN(4)
.zip(Stream(1,2,3,4))
val writeJob: Stream[Pure, ExitCode] = l.map {
case (ch, i) =>
val path = Paths.get(s"file_$i")
Stream.resource(Blocker[IO]).flatMap { blocker =>
Stream.chunk(ch)
.covary[IO]
.through(text.utf8Encode)
.through(file.writeAll(path, blocker))
}.compile.drain.as(ExitCode.Success).unsafeRunSync()
}
writeJob.compile.drain.as(ExitCode.Success)
}
1)所以我unsafeRunSync()
不止一次打电话这是一个缺陷(因为地图被多次调用)。但我不知道如何以另一种方式解决它。我是 FS2 和 Cats-Effects 的新手。
2)另外,我在chunkN(4)
这里打电话。在我的例子中,这看起来是无害的。但在我的实际用例中,我会使用chunkN(10 ^ 6)
. 我是否Chunks
从文档中正确理解 FS2 会在内存中累积 10^6 块,然后将它们发送到下游?
我知道有一个 fileRotate 函数,它可以和 Bytes 一样工作,limit
但我认为它在我的用例中并没有真正的帮助。
谢谢
解决方案
所以,这个问题已经快一年半了,但我希望这仍然有用。
我是否正确理解文档中的块,即 FS2 会在内存中累积 10^6 块,然后将它们发送到下游?
是的。
回顾一些 FS2 概念: aStream[F, O]
是一种延迟生成的F
效果(类似于程序),它产生许多类型的元素O
。现在,这些元素不是“一个接一个”地产生的,而是以称为块的小元素数组的形式产生的。
流在块内级别是严格的,即每个块作为一个整体成功生成,或者流失败。这意味着要访问第一个元素,必须在内存中分配整个第一个块。
另一方面,流在块间级别是惰性的,这意味着流的编译只会在给定时间分配流的“消费者”需要的块,但其他块会在下线尚未加载,因此尚未在内存中。
该chunkN
方法用于更改流的“块结构”,因此可以将小块连接成较大的块,或者将太大的块分割成较小的块。在示例中,调用str.chunkN(1_000_000)
将返回一个流,该流在编译后将从str
流中提取所需数量的块,直到从中获取一百万个元素,然后在单个块中发出这一百万个元素.
推荐阅读
- haskell - “:”运算符的 Haskell 类型问题
- powershell - 使用 pnp powershell 从现有 SharePoint 网站 url 创建 MS Teams
- java - 迭代时向 HashSet 添加元素
- amazon-web-services - 在 aws cloudsearch 中提取错误的结果
- sql - 在 CASE 语句中的 THEN 之后添加 OR 的方法?
- azure - Azure 前端 Web APP 和 Azure APIM 相互客户端证书身份验证不起作用
- java - spring-data-hazelcast @Query 注释给出 NullPointerException
- swift - 期望 SwiftUI DynamicProperty 属性包装器的内部更新来触发视图更新是否正确?
- javascript - 谷歌脚本:变量总是显示未定义
- c - 如何在c中正确扫描txt文件中的东西