scala - Spark 内存不足异常
问题描述
我每天收到 10 GB 包含员工详细信息的文件。需要从前一天和当天文件中选择最新记录。例如:8 月 6 日和 8 月 7 日的文件需要在时间戳列上进行比较并选择最新记录
8 月 6 日文件
emp-id name dept phone-No time-Stamp 1 Jhon Sales 817234518 12-6-2019 2 Marry Production 927234565 4-3-2019 3 James Marketing 625234522 21-1-2019
8 月 7 日文件
emp-id name dept phone-No time-Stamp 1 Jhon Sales 817234518 12-7-2019 4 Jerry Sales 653214442 12-7-2019 3 James Marketing 625234522 2-6-2019
预期产出
emp-id name dept phone-No time-Stamp 1 Jhon Sales 817234518 12-7-2019 2 Marry Production 927234565 4-3-2019 3 James Marketing 625234522 2-5-2019 4 Jerry Sales 653214442 12-7-2019
我尝试了以下解决方案并得到了预期的结果。
val mergedDF = currentDayDF.union(previousDayDF)
mergedDF.show(false)
val windowSpec = Window.partitionBy("emp-id").orderBy(col("timeStamp").desc)
val latestForEachKey = mergedDF.withColumn("rank", rank().over(windowSpec))
.filter(col("rank") === 1)
.drop("rank")
问题
每天的输入文件大小为 10 GB,如果集群内存(执行器总内存)小于 20 GB 来加载两个数据集(前一天和当天)会引发 Out of Memory 异常怎么办?
我认为,spark 将大文件划分为要处理的分区,因此一开始只有少数分区被加载到执行程序内存中,应用转换并将中间结果数据集保存到辅助内存,然后继续处理剩余的分区。但是分区需要所有数据分区作为其更广泛的转换,我的猜测是错误的。那么 spark 会抛出 OOM 异常吗?
解决方案
分区用于并行执行。Spark 将尝试在所有可用分区中同时加载所有 20GB 数据。如果创建分区的所有 executor 的内存总和小于 20 GB,则会抛出内存不足错误
推荐阅读
- javascript - 查找函数参数的数量及其名称
- javascript - Javascript - 将回调函数作为参数传递,可以接受任意数量的帐户
- r - 在 R mlogit 包中使用字符时出现计算奇点错误
- c++ - 如何在二叉树类中正确重载运算符 << ?
- python - xlWings 将字符串返回给函数
- angular - 自定义输入绑定角度 5
- javascript - 具有相同选项的多项选择
- python - 卡在 Python 中的 Pong 游戏
- algorithm - 有没有办法比较两个频谱图?
- sql - 如何在 SQLALCHEMY 中获取复杂的 Oracle UserDefinedType 作为存储过程的输出