apache-spark - Apache Spark 重新分区/分桶最佳实践以避免数据倾斜
问题描述
我专注于对数据帧进行分桶和重新分区,以将数据分组在一起以加快以后的处理速度。
使用分桶,目的是将数据拆分为固定数量的“桶”,分布在一组列上。目标是将分区数据拆分为一组相等的桶。
我确信您已经收集到总体目标是使用此方法来防止数据倾斜。
以下是我为实现分桶而采用的分步过程。
请注意,我使用的是 Spark SQL 而不是 PySpark,但原理是相同的。例如,我使用名为 NTILE 的函数来存储数据。该函数取自典型的 T-SQL。
无论如何,我们开始:
我有以下两个数据框
df_table1 = spark.read.csv("/tamingskew/table1/", header=True)
df_table2 = spark.read.csv("/tamingskew/table2/", header=True)
Apache Spark 采用数据帧并为每个数据帧创建一个分区,如以下代码所示:
df_table1.rdd.getNumPartitions()
1
然后我创建两个表来执行查询以生成分桶数据:
df_table1.createOrReplaceTempView("t1")
df_table2.createOrReplaceTempView("t2")
现在,要对第一个表 't1' 进行一些分桶和重新分区
df_pt1 = spark.sql("""SELECT
t1.*
,NTILE(8) OVER (ORDER BY t1.registration) AS newpart1
FROM t1
ORDER BY newpart1 DESC
""").repartition(8, col("newpart1"))
从上面可以看出,我已经将数据分桶到 8 个桶中,并根据分桶数据“newpart1”将数据重新分区为 8 个分区
然后我执行以下代码来查看分区数据的样子
print("Number of partitions: {}".format(df_pt1.rdd.getNumPartitions()))
print('Partitioning distribution: '+ str(df_pt1.rdd.glom().map(len).collect()))
它如下所示:
Number of partitions: 8
Partitioning distribution: [0, 1250, 1250, 3750, 0, 0, 2500, 1250]
上面的输出并不理想,因为如您所见,有 3 个分区根本没有任何数据。
然后,我从上面的分桶数据框中创建一个表以用于我的最终查询:
df_pt1.createOrReplaceTempView('t11')
然后我对 table2 做同样的事情(不再详细介绍)
df_pt2 = spark.sql("""SELECT
t2.*
,NTILE(8) OVER (ORDER BY t2.sale_price) AS newpart2
FROM t2
ORDER BY newpart2 DESC
""").repartition(8, col("newpart2"))
print("Number of partitions: {}".format(df_pt2.rdd.getNumPartitions()))
print('Partitioning distribution: '+ str(df_pt2.rdd.glom().map(len).collect()))
Number of partitions: 8
Partitioning distribution: [0, 12500, 12500, 37500, 0, 0, 25000, 12500]
df_pt2.createOrReplaceTempView('t22')
最终查询如下:
querywithgroups = spark.sql("""SELECT
t11.registration
,AVG(t22.sale_price) AS average_price
FROM t11
INNER JOIN t22
ON t11.make = t22.make
AND t11.model = t22.model
WHERE ABS(t22.engine_size - t11.engine_size) <= 0.1
GROUP BY t11.registration
,t11.newpart1
,t22.newpart2
""")
这提供了以下拆分分区:
Number of partitions: 8
Partitioning distribution: [0, 10000, 10000, 30000, 0, 0, 20000, 10000]
我的问题是:有没有更好的方法使用分桶对数据进行分区以加快查询速度?以上可以改进吗。
任何想法都非常感谢。