首页 > 解决方案 > 如何在scala fs2中处理大量数据?

问题描述

我们有一个 Scala 实用程序,它从数据库读取数据,然后使用 fs2 库将数据写入 csv 格式的文本文件。然后它对几列进行一些处理并创建最终文件。所以这是一个两步的过程。

  1. 从 db 读取数据并创建一个 data_tmp csv 文件。
  2. 处理 _tmp 文件中的几列并创建最终文件 data_final csv 文件。

我们使用类似于链接的代码: https ://levelup.gitconnected.com/how-to-write-data-processing-application-in-fs2-2b6f84e3939c

Stream.resource(Blocker[IO]).flatMap {  blocker =>
  val inResource = getClass.getResource(in) // data_tmp file location
  val outResource = getClass.getResource(out) // data_final file location
  io.file
    .readAll[IO](Paths.get(inResource.toURI), blocker, 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    ..... // our processing logic here
    .through(text.utf8Encode)
    .through(io.file.writeAll(Paths.get(outResource.toURI), blocker))
  
}

到目前为止,这曾经有效,因为我们没有超过 5k 条记录。

现在我们有一个新的需求,我们期望从查询到 db 的数据在 50k 到 1000k 的范围内。

所以我们要创建多个 data_final 文件,比如 data_final_1、data_final_2、...等等。

每个输出文件不应超过特定大小,比如说 2 MB。

所以 data_final 应该以 2 MB 的块创建。

请帮我修改上面的代码片段,以便我们可以从单个大型 data_tmp csv 文件创建多个输出文件。

标签: scalascala-catsfs2

解决方案


推荐阅读