dataframe - 大型数据框上的 Pyspark groupBy
问题描述
我们有一个非常大的 Pyspark Dataframe,我们需要对其执行 groupBy 操作。
我们已经尝试过
df_gp=df.groupBy('some_column').count()
这需要很长时间(它已经运行了超过 17 小时而没有结果)。
我也试过
df_gp=df.groupBy('some_column').agg(count)
但据我所知,行为是相同的。
有关更多上下文:
- 我们在 Zeppelin(版本 0.8.0)上运行此操作,使用 %spark2.pyspark 解释器
- Zeppelin 在 Yarn 客户端上运行
- 数据存储在 Hive (Hive 3.1.0.3.1.0.0-78)
- 初始 Dataframe 是通过使用 llap 查询 Hive 创建的:
from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
req=""" SELECT *
FROM table
where isodate='2020-07-27'
"""
df = hive.executeQuery(req)
- 数据框大小约为 6000 万行,9 列
- 在同一环境中对同一 Dataframe 执行的其他操作,例如
count()
或cache()
在一分钟内完成
我一直在阅读groupBy
不同来源的 Spark,但从我在这里收集的内容来看,Dataframe API 不需要在内存中加载或随机播放键,因此即使在大型 Dataframe 上也不应该成为问题。
我知道groupBy
处理如此大量的数据可能需要一些时间,但这确实太多了。我想有一些内存参数可能需要调整,或者我们执行 groupBy 操作的方式可能有问题?
[编辑] 我忘了提到之前在 Dataframe 上处理了一些 UDF groupBy
。我试过了 :
groupBy
在没有 UDF 的大型 Dataframe 上:在不到一分钟的时间内给出结果groupBy
在处理后的数据帧样本上:与以前相同的问题
所以我们认为 UDF 是问题的真正原因,而不是groupBy
解决方案
先说几个神话爆破者
.groupBy('some_column').count()
并且.groupBy('some_column').count()
是相同的groupBy
导致随机播放,该帖子的意思是它只随机播放必要的列数据(没有在 groupBy 或 agg 函数中未使用的额外列)我一直在阅读有关 Spark 的 groupBy 不同来源的信息,但从我在这里收集的内容来看,Dataframe API 不需要在内存中加载或随机播放键,因此即使在大型 Dataframe 上也不应该成为问题。
现在解决你的问题
groupBy
如果更多数据被打乱并spark.sql.shuffle.partitions
设置为低(默认为 200),则可能需要一些时间。在这种情况下,1 个核心将有大量的混洗数据进行聚合- 如果使用的列
groupBy
有数据倾斜,也可能需要很长时间,因为它会导致大量数据进入单个执行程序核心
解决方案
- 增加到
spark.sql.shuffle.partitions
更高的值(根据我的经验,应该<amount_of_data_shuffled_in_gb>/100MB
确保 1 个核心获得大约 100 MB 的数据进行聚合 - 可以通过在数据中引入随机性(加盐)来解决偏差https://dzone.com/articles/why-your-spark-apps-are-slow-or-failing-part-ii-da
推荐阅读
- excel - Excel VBA 复制范围(左侧的所有单元格)到嵌入的 Word 文档
- bash - 从脚本变量中提取文件名
- reactjs - 在 HTTPS 而不是 HTTP 中启动反应应用程序
- jquery - 点击甚至不适用于动态生成的元素
- sql-server - 为什么一个特定的 SQL Server 数据库无法将 VARCHAR 转换为 NUMERIC?
- sql-server - 访问连接后创建的新表
- npm - 我无法让 Node.js 安装当前版本(Windows)
- c# - 如何获取 [Authorize] 重定向到登录页面的原始 URL?
- javascript - 单击时更改生成的 div 的样式属性
- java - spring转换器接口如何保证实现的线程安全