首页 > 解决方案 > spark - 在大型数据帧上执行 groupby 和聚合时,java 堆内存不足

问题描述

我是 spark 新手,没有 Java 编程经验。我正在使用 pyspark 处理一个非常大的时间序列数据集,其中包含接近 4000 个数字(浮点)列和数十亿行。

我想用这个数据集实现以下目标:

时间序列数据以 10 毫秒为间隔。我想按 1s 间隔对数据进行分组,并使用均值作为聚合函数。

这是我用来读取分区镶木地板文件的代码。

df = (spark.read.option("mergeSchema", "true")
           .parquet("/data/"))

这是我编写的 groupby 和聚合代码:

col_list = [... list of numeric columns in the dataframe ...]

agg_funcs = [mean]   # I also want to add other aggregation functions here later.

exprs     = [f(df[c]).alias(f.__name__ + '_' + c) for f in agg_funcs for c in col_list]

result = (df.groupBy(['Year', 'Month', 'Day', 'Hour', 'Minute', 'Second'])
            .agg(*exprs))

现在,我想将上述结果数据帧写入分区镶木地板:

(result.write.mode("overwrite")
       .partitionBy('Year', 'Month', 'Day', 'Hour', 'Minute', 'Second')
       .parquet('/out/'))

但是,我得到一个 java 堆内存不足错误。

我尝试增加 spark.sql.shuffle.partitions以使每个分区的大小更小,但这没有帮助。

我的火花集群配置:

2 workers + 1 master
Both the worker nodes have 256 GB RAM and 32 cores each.
Master node has 8 cores and 32 GB RAM.

我为我的 spark 作业指定的配置是:

{
    "driverMemory": "8G", 
    "driverCores": 4, 
    "executorMemory": "20G", 
    "executorCores": 4, 
    "numExecutors": 14, 
    "conf": {
        "spark.sql.shuffle.partitions": 2000000
    }
}

以下是 Ambari 关于集群配置的一些截图:

纱线记忆

纱线处理器

有人可以帮我理解为什么会出现内存问题以及如何解决吗?谢谢。

标签: apache-sparkpysparkapache-spark-sqlparquetpyspark-dataframes

解决方案


我相信这是由于数据倾斜而发生的,并且您的一个分区正在发生 OOM。

Spark 的 groupBy() 需要一次将所有键值加载到内存中才能执行 groupby。

增加分区不起作用,因为您可能拥有具有相似分组的大数据。按键检查您是否有类似组的数据倾斜。

查看这篇文章,它更好地解释了这一点。


推荐阅读