首页 > 解决方案 > Spark join:对同一分区中的特定列具有相同值的记录分组

问题描述

我们有 2 个 Hive 表,它们在 spark 中读取并使用连接键连接,我们称之为 user_id。然后,我们将这个连接的数据集写入 S3 并将其注册为第三张表,以便后续任务使用这个连接的数据集。连接数据集中的其他列之一称为 keychain_id。

我们希望将属于同一 keychain_id 的所有用户记录分组在同一个分区中,以避免以后出现洗牌。那么,我可以在写入 s3 并将其注册到 Hive 之前进行重新分区(“keychain_id”)吗,当我从第三个表中读取相同的数据时,它仍然具有相同的分区分组(所有用户属于相同的 keychain_id在同一个分区中)?因为试图避免每次从第三个表读取时都进行重新分区(“keychain_id”)。你能澄清一下吗?如果不能保证它在读取时会保留相同的分区分组,那么除了缓存之外还有另一种有效的方法吗?

标签: apache-sparkapache-spark-sql

解决方案


如果 keychain_id 中没有数据倾斜(将导致分区文件大小不同),您可以使用 partitionBy 进行写入:

 df.write\
 .partitionBy("keychain_id")\
 .mode("overwrite")\
 .format("parquet")\
 .saveAsTable("testing")

更新:

为了“在同一数据帧分区中保留具有相同 keychain_id 的用户记录的分组”

您可以在唯一的 ID 和/或列上重新分区

from pyspark.sql import functions as F
n = df.select(F.col('keychain_id')).distinct().count()

df.repartition(n, F.col("keychain_id)\
 .write \
 .partitionBy("keychain_id")\
 .mode("overwrite")\
 .format("parquet")\
 .saveAsTable("testing")

 or 

df.repartition(n)\
 .write \
 .partitionBy("keychain_id")\
 .mode("overwrite")\
 .format("parquet")\
 .saveAsTable("testing")

推荐阅读