首页 > 解决方案 > 使用数据帧上的许多过滤条件创建 Spark 作业的时间很长

问题描述

我有一个带有形状的 PySpark 数据框(1e10, 14),我想用大约 50 个复合 OR 语句对其进行过滤,即:

sql_string = "
(col1='val1' and col2=5) or 
(col1='val2' and col2=7) or
(col1='val3' and col2=5) or
...
"
df_f = df.filter(sql_string)
df_f.limit(1000).show()

如果这些单个 OR 语句的数量 < 10,则立即创建 show 方法的 Spark 作业。
但是,对于大约 15 个 OR,创建 Spark 作业已经需要大约 30 秒。
在大约 20 个 OR 时,创建任何 Spark 作业的时间变得难以管理(超过几个小时)。

从大约 15 个 OR 开始,每隔几秒就会显示一次 GC 分配消息,即:

2020-05-04T09:55:50.762+0000: [GC (Allocation Failure) [PSYoungGen: 7015644K->1788K(7016448K)] 7266861K->253045K(21054976K), 0.0063209 secs] [Times: user=0.02 sys=0.00, real=0.01 secs] 

因此,似乎正在发生一些时髦的事情。当一个人在 Spark Dataframes 上循环时,感觉类似于这个问题?

该驱动程序具有 32GB RAM(已使用 10G)和 4 个内核(1 个内核 100% 已使用,其他内核接近 0%)。I/O 几乎为零。
尽管一个核心的使用率为 100%,但集群认为它处于非活动状态,因为它在我设置的非活动时间后关闭。

这是执行计划的链接:https ://pastebin.com/7MEv5Sq2 。

标签: apache-sparkpyspark

解决方案


在这种情况下,您使用复合 OR 语句基于多个硬编码值过滤数据帧,因此 spark 催化剂优化器还必须逐个检查每个过滤器并在执行每个 OR 语句后加载完整的数据帧。

因此,当我们缓存数据帧时,它已经在内存中,因此通过将缓存的数据帧传递给所有执行程序来更快地执行它。

对于大型数据帧,您可以尝试在内存和磁盘上持久化,这应该可以提高您所寻求的性能,但如果这不起作用,您可以通过 col1 过滤数据帧然后在 col2 上过滤已经过滤的数据帧来改进查询。这将需要您实现一些基于逻辑的方法,以最大限度地减少对大数据的迭代。

希望能帮助到你。


推荐阅读