首页 > 解决方案 > FS2 中的块文件轮换

问题描述

我有一大串字符串。现在我想将这些块写入文件。在每个文件中我想写 N 个块。我写了下面的代码来解决这个问题。但我的方法肯定不是惯用的。

object Diff extends App{

  implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  val l: Stream[Pure, (Chunk[String], Int)] =
    Stream("Jakob", "Leo", "Tom", "Anton", "Lavinia", "Daniel")
      .intersperse("\r\n")
    .chunkN(4)
    .zip(Stream(1,2,3,4))


    val writeJob: Stream[Pure, ExitCode] = l.map {
      case (ch, i) =>
        val path = Paths.get(s"file_$i")
        Stream.resource(Blocker[IO]).flatMap { blocker =>
        Stream.chunk(ch)
          .covary[IO]
          .through(text.utf8Encode)
          .through(file.writeAll(path, blocker))
    }.compile.drain.as(ExitCode.Success).unsafeRunSync()
  }

  writeJob.compile.drain.as(ExitCode.Success)
}

1)所以我unsafeRunSync()不止一次打电话这是一个缺陷(因为地图被多次调用)。但我不知道如何以另一种方式解决它。我是 FS2 和 Cats-Effects 的新手。

2)另外,我在chunkN(4)这里打电话。在我的例子中,这看起来是无害的。但在我的实际用例中,我会使用chunkN(10 ^ 6). 我是否Chunks从文档中正确理解 FS2 会在内存中累积 10^6 块,然后将它们发送到下游?

我知道有一个 fileRotate 函数,它可以和 Bytes 一样工作,limit但我认为它在我的用例中并没有真正的帮助。

谢谢

标签: scalascala-catsfs2

解决方案


所以,这个问题已经快一年半了,但我希望这仍然有用。

我是否正确理解文档中的块,即 FS2 会在内存中累积 10^6 块,然后将它们发送到下游?

是的。

回顾一些 FS2 概念: aStream[F, O]是一种延迟生成的F效果(类似于程序),它产生许多类型的元素O。现在,这些元素不是“一个接一个”地产生的,而是以称为块的小元素数组的形式产生的。

  • 流在块内级别是严格的,即每个块作为一个整体成功生成,或者流失败。这意味着要访问第一个元素,必须在内存中分配整个第一个块。

  • 另一方面,流在块间级别是惰性的,这意味着流的编译只会在给定时间分配流的“消费者”需要的块,但其他块会在下线尚未加载,因此尚未在内存中。

chunkN方法用于更改流的“块结构”,因此可以将小块连接成较大的块,或者将太大的块分割成较小的块。在示例中,调用str.chunkN(1_000_000)将返回一个流,该流在编译后将从str流中提取所需数量的块,直到从中获取一百万个元素,然后在单个块中发出这一百万个元素.


推荐阅读