首页 > 解决方案 > Spark 内存不足异常

问题描述

我每天收到 10 GB 包含员工详细信息的文件。需要从前一天和当天文件中选择最新记录。例如:8 月 6 日和 8 月 7 日的文件需要在时间戳列上进行比较并选择最新记录

我尝试了以下解决方案并得到了预期的结果。

 val mergedDF = currentDayDF.union(previousDayDF)

mergedDF.show(false)

val windowSpec = Window.partitionBy("emp-id").orderBy(col("timeStamp").desc)
val latestForEachKey = mergedDF.withColumn("rank", rank().over(windowSpec))
                               .filter(col("rank") === 1)
                               .drop("rank")

问题

  1. 每天的输入文件大小为 10 GB,如果集群内存(执行器总内存)小于 20 GB 来加载两个数据集(前一天和当天)会引发 Out of Memory 异常怎么办?

    我认为,spark 将大文件划分为要处理的分区,因此一开始只有少数分区被加载到执行程序内存中,应用转换并将中间结果数据集保存到辅助内存,然后继续处理剩余的分区。但是分区需要所有数据分区作为其更广泛的转换,我的猜测是错误的。那么 spark 会抛出 OOM 异常吗?

标签: scalaapache-spark

解决方案


分区用于并行执行。Spark 将尝试在所有可用分区中同时加载所有 20GB 数据。如果创建分区的所有 executor 的内存总和小于 20 GB,则会抛出内存不足错误


推荐阅读