scala - 从数据库加载大量记录时内存不足
问题描述
我在 Akka Streams 中使用 slick 从数据库(postgresql)加载大量记录(~2M)并将它们写入 S3 文件。但是,我注意到我下面的代码适用于大约 50k 的记录,但对于超过 100k 标记的任何记录都失败了。
val allResults: Future[Seq[MyEntityImpl]] =
MyRepository.getAllRecordss()
val results: Future[MultipartUploadResult] = Source
.fromFuture(allResults)
.map(seek => seek.toList)
.mapConcat(identity)
.map(myEntity => myEntity.toPSV + "\n")
.map(s => ByteString(s))
.runWith(s3Sink)
以下是myEntity
外观示例:
case class MyEntityImpl(partOne: MyPartOne, partTwo: MyPartTwo) {
def toPSV: String = myPartOne.toPSV + myPartTwo.toPSV
}
case class MyPartOne(field1: String, field2: String) {
def toPSV: String = {s"$field1|"+s"$field2"}
}
case class MyPartOne(field1: String, field2: String) {
def toPSV: String = {s"$field1|"+s"$field2"}
}
我正在寻找一种以更具反应性的方式执行此操作的方法,以免内存不足。
解决方案
根本问题
问题是在将它们分派到s3Sink
.
数据被拉入内存的第一个位置可能是您的MyRepository.getAllRecords()
方法。大多数(如果不是全部)Seq
实现都是基于内存的。您肯定使用本地内存的第二个地方是seek.toList
因为 aList
将所有数据存储在内存中。
解决方案
而不是Seq
从getAllRecords
你那里返回 a 应该直接返回一个基于光滑的Source
akka。这将确保您的物化流在进入 s3 之前只需要用于瞬态处理步骤的内存。
如果您的方法定义更改为:
def getAllRecords() : Source[MyEntityImpl, _]
然后流的其余部分将以反应方式运行:
MyRepository
.getAllRecords()
.map(myEntity => myEntity.toPSV + "\n")
.map(ByteString.apply)
.runWith(s3Sink)
推荐阅读
- php - 如何将电报机器人连接到 vps(Windows 服务器)?
- c# - out 关键字:C# 到 python 的翻译
- makefile - 带有 % 的 Makefile 目标模式
- tensorflow - 在本地加载 GCMLE 并获取激活
- azure - 无法更改 Hive 仓库目录
- regex - RegExp_Extract 在 Google Data Studio 中第一个点之前的所有内容
- express - Handlebars 和 JWT 令牌到本地存储
- c - 使用 + 检查多个指针是否都为 NULL
- java - 用于对 Java 类进行基准测试的 JMH 与 JMeter?
- javascript - 为什么这个函数返回一个函数?