首页 > 解决方案 > Apache Spark 重新分区/分桶最佳实践以避免数据倾斜

问题描述

我专注于对数据帧进行分桶和重新分区,以将数据分组在一起以加快以后的处理速度。

使用分桶,目的是将数据拆分为固定数量的“桶”,分布在一组列上。目标是将分区数据拆分为一组相等的桶。

我确信您已经收集到总体目标是使用此方法来防止数据倾斜。

以下是我为实现分桶而采用的分步过程。

请注意,我使用的是 Spark SQL 而不是 PySpark,但原理是相同的。例如,我使用名为 NTILE 的函数来存储数据。该函数取自典型的 T-SQL。

无论如何,我们开始:

我有以下两个数据框

df_table1 = spark.read.csv("/tamingskew/table1/", header=True)
df_table2 = spark.read.csv("/tamingskew/table2/", header=True)

Apache Spark 采用数据帧并为每个数据帧创建一个分区,如以下代码所示:

df_table1.rdd.getNumPartitions()

1

然后我创建两个表来执行查询以生成分桶数据:

df_table1.createOrReplaceTempView("t1")
df_table2.createOrReplaceTempView("t2")

现在,要对第一个表 't1' 进行一些分桶和重新分区

df_pt1 = spark.sql("""SELECT
  t1.*
 ,NTILE(8) OVER (ORDER BY t1.registration) AS newpart1
FROM t1
ORDER BY newpart1 DESC
""").repartition(8, col("newpart1"))

从上面可以看出,我已经将数据分桶到 8 个桶中,并根据分桶数据“newpart1”将数据重新分区为 8 个分区

然后我执行以下代码来查看分区数据的样子

print("Number of partitions: {}".format(df_pt1.rdd.getNumPartitions())) 
print('Partitioning distribution: '+ str(df_pt1.rdd.glom().map(len).collect()))

它如下所示:

Number of partitions: 8
Partitioning distribution: [0, 1250, 1250, 3750, 0, 0, 2500, 1250]

上面的输出并不理想,因为如您所见,有 3 个分区根本没有任何数据。

然后,我从上面的分桶数据框中创建一个表以用于我的最终查询:

df_pt1.createOrReplaceTempView('t11')

然后我对 table2 做同样的事情(不再详细介绍)

df_pt2 = spark.sql("""SELECT
  t2.*
 ,NTILE(8) OVER (ORDER BY t2.sale_price) AS newpart2
FROM t2
ORDER BY newpart2 DESC
""").repartition(8, col("newpart2"))


print("Number of partitions: {}".format(df_pt2.rdd.getNumPartitions())) 
print('Partitioning distribution: '+ str(df_pt2.rdd.glom().map(len).collect()))

Number of partitions: 8
Partitioning distribution: [0, 12500, 12500, 37500, 0, 0, 25000, 12500]

df_pt2.createOrReplaceTempView('t22')

最终查询如下:

querywithgroups = spark.sql("""SELECT
  t11.registration
 ,AVG(t22.sale_price) AS average_price
FROM t11
INNER JOIN t22
  ON t11.make = t22.make
    AND t11.model = t22.model
WHERE ABS(t22.engine_size - t11.engine_size) <= 0.1
GROUP BY t11.registration
        ,t11.newpart1
        ,t22.newpart2
""")

这提供了以下拆分分区:

Number of partitions: 8
Partitioning distribution: [0, 10000, 10000, 30000, 0, 0, 20000, 10000]

我的问题是:有没有更好的方法使用分桶对数据进行分区以加快查询速度?以上可以改进吗。

任何想法都非常感谢。

标签: apache-sparkapache-spark-sql

解决方案


推荐阅读