scala - 如何在scala fs2中处理大量数据?
问题描述
我们有一个 Scala 实用程序,它从数据库读取数据,然后使用 fs2 库将数据写入 csv 格式的文本文件。然后它对几列进行一些处理并创建最终文件。所以这是一个两步的过程。
- 从 db 读取数据并创建一个 data_tmp csv 文件。
- 处理 _tmp 文件中的几列并创建最终文件 data_final csv 文件。
我们使用类似于链接的代码: https ://levelup.gitconnected.com/how-to-write-data-processing-application-in-fs2-2b6f84e3939c
Stream.resource(Blocker[IO]).flatMap { blocker =>
val inResource = getClass.getResource(in) // data_tmp file location
val outResource = getClass.getResource(out) // data_final file location
io.file
.readAll[IO](Paths.get(inResource.toURI), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
..... // our processing logic here
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get(outResource.toURI), blocker))
}
到目前为止,这曾经有效,因为我们没有超过 5k 条记录。
现在我们有一个新的需求,我们期望从查询到 db 的数据在 50k 到 1000k 的范围内。
所以我们要创建多个 data_final 文件,比如 data_final_1、data_final_2、...等等。
每个输出文件不应超过特定大小,比如说 2 MB。
所以 data_final 应该以 2 MB 的块创建。
请帮我修改上面的代码片段,以便我们可以从单个大型 data_tmp csv 文件创建多个输出文件。
解决方案
推荐阅读
- python - 使用scrapy管道写入文件
- docker - 如何使用调度程序(cron)容器在其他容器中执行命令
- mysql - 创建视图连接具有相同列且不重复的两个表
- oracle - Hibernate query to get Data from Oracle table inserted in last 5 mins
- c# - Two order bys then picking the last element in linq
- sql-server - 服务器名称更改后如何使用相同的 SQL 连接字符串?
- opengl - 这个 gltf 文件中的框的索引是如何工作的?
- html - 创建响应式网格布局,其中所有元素在垂直和水平方向上彼此等距
- php - Where does CodeIgniter place newly created webpages?
- python - 如何在项目后期以不同的形式更新模型中的字段?