apache-spark - spark - 在大型数据帧上执行 groupby 和聚合时,java 堆内存不足
问题描述
我是 spark 新手,没有 Java 编程经验。我正在使用 pyspark 处理一个非常大的时间序列数据集,其中包含接近 4000 个数字(浮点)列和数十亿行。
我想用这个数据集实现以下目标:
时间序列数据以 10 毫秒为间隔。我想按 1s 间隔对数据进行分组,并使用均值作为聚合函数。
这是我用来读取分区镶木地板文件的代码。
df = (spark.read.option("mergeSchema", "true")
.parquet("/data/"))
这是我编写的 groupby 和聚合代码:
col_list = [... list of numeric columns in the dataframe ...]
agg_funcs = [mean] # I also want to add other aggregation functions here later.
exprs = [f(df[c]).alias(f.__name__ + '_' + c) for f in agg_funcs for c in col_list]
result = (df.groupBy(['Year', 'Month', 'Day', 'Hour', 'Minute', 'Second'])
.agg(*exprs))
现在,我想将上述结果数据帧写入分区镶木地板:
(result.write.mode("overwrite")
.partitionBy('Year', 'Month', 'Day', 'Hour', 'Minute', 'Second')
.parquet('/out/'))
但是,我得到一个 java 堆内存不足错误。
我尝试增加 spark.sql.shuffle.partitions
以使每个分区的大小更小,但这没有帮助。
我的火花集群配置:
2 workers + 1 master
Both the worker nodes have 256 GB RAM and 32 cores each.
Master node has 8 cores and 32 GB RAM.
我为我的 spark 作业指定的配置是:
{
"driverMemory": "8G",
"driverCores": 4,
"executorMemory": "20G",
"executorCores": 4,
"numExecutors": 14,
"conf": {
"spark.sql.shuffle.partitions": 2000000
}
}
以下是 Ambari 关于集群配置的一些截图:
有人可以帮我理解为什么会出现内存问题以及如何解决吗?谢谢。
解决方案
我相信这是由于数据倾斜而发生的,并且您的一个分区正在发生 OOM。
Spark 的 groupBy() 需要一次将所有键值加载到内存中才能执行 groupby。
增加分区不起作用,因为您可能拥有具有相似分组的大数据。按键检查您是否有类似组的数据倾斜。
推荐阅读
- php - 你如何使 symfony make:crud 可用于 API?
- java - Java 中的 CopyOnWriteArrayList 与 AtomicArrays
- tsql - SQL Server 能否在不涉及任何错误的上下文中向 .NET System.Data.SqlClient 程序返回信息性消息
- syntax-error - 输入结束时出现 PostgresQL 语法错误,我该如何解决这个问题?
- matlab - 有没有办法使用 Matlab 将 .RAW 音频文件转换为 .wav?
- c++ - typename参数包和auto参数包的区别?
- c++ - 我应该内联命名空间范围 lambdas 吗?无论哪种情况,为什么?
- python - Sympy/Ipython 何时自动为显示下标系数?
- wordpress - Swiper Slider 在标签 wordpress elementor 中不起作用
- python - 我一直在尝试打印图像,其中图像在另一个文件夹中,图像的 ID 在另一个 csv 文件夹中