首页 > 解决方案 > 大型数据框上的 Pyspark groupBy

问题描述

我们有一个非常大的 Pyspark Dataframe,我们需要对其执行 groupBy 操作。

我们已经尝试过

df_gp=df.groupBy('some_column').count()

这需要很长时间(它已经运行了超过 17 小时而没有结果)。

我也试过

df_gp=df.groupBy('some_column').agg(count)

但据我所知,行为是相同的。

有关更多上下文:

from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()

req=""" SELECT *
        FROM table
        where isodate='2020-07-27'
    """

df = hive.executeQuery(req)

我一直在阅读groupBy不同来源的 Spark,但从我在这里收集的内容来看,Dataframe API 不需要在内存中加载或随机播放键,因此即使在大型 Dataframe 上也不应该成为问题。

我知道groupBy处理如此大量的数据可能需要一些时间,但这确实太多了。我想有一些内存参数可能需要调整,或者我们执行 groupBy 操作的方式可能有问题?

[编辑] 我忘了提到之前在 Dataframe 上处理了一些 UDF groupBy。我试过了 :

所以我们认为 UDF 是问题的真正原因,而不是groupBy

标签: dataframeapache-sparkpysparkapache-zeppelin

解决方案


先说几个神话爆破者

  1. .groupBy('some_column').count()并且.groupBy('some_column').count()是相同的

  2. groupBy导致随机播放,该帖子的意思是它只随机播放必要的列数据(没有在 groupBy 或 agg 函数中未使用的额外列)

    我一直在阅读有关 Spark 的 groupBy 不同来源的信息,但从我在这里收集的内容来看,Dataframe API 不需要在内存中加载或随机播放键,因此即使在大型 Dataframe 上也不应该成为问题。

现在解决你的问题

  1. groupBy如果更多数据被打乱并spark.sql.shuffle.partitions设置为低(默认为 200),则可能需要一些时间。在这种情况下,1 个核心将有大量的混洗数据进行聚合
  2. 如果使用的列groupBy有数据倾斜,也可能需要很长时间,因为它会导致大量数据进入单个执行程序核心

解决方案

  1. 增加到spark.sql.shuffle.partitions更高的值(根据我的经验,应该<amount_of_data_shuffled_in_gb>/100MB确保 1 个核心获得大约 100 MB 的数据进行聚合
  2. 可以通过在数据中引入随机性(加盐)来解决偏差https://dzone.com/articles/why-your-spark-apps-are-slow-or-failing-part-ii-da

推荐阅读