首页 > 解决方案 > 对 pyspark 中的列进行重新分区如何影响分区数?

问题描述

我有一个包含一百万条记录的数据框。看起来像这样 -

df.show()

+--------------------+--------------------++-------------
|            feature1|            feature2| domain    |
+--------------------+--------------------++-------------
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain1   | 
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain2   |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain1   |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain2   |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain1   |

Spark 中的理想分区大小为 128 MB,假设域列有两个唯一值(域 1 和域 2),考虑到这一点,我有两个问题 -

  1. 如果我这样做df.repartition("domain")并且如果一个分区无法容纳特定域密钥的所有数据,应用程序会失败还是会根据数据自动创建合适的分区?

  2. 假设在上面的数据中,基于域键已经发生了重新分区,所以会有两个分区(唯一的键是 domain1 和 domain2)。现在假设 domain1 和 domain2 重复了 1000000 次,我将基于域进行自加入。因此,对于每个域,我将获得大约 10^12 条记录。考虑到我们有两个分区,并且在连接过程中分区的数量没有变化,这两个新分区是否能够处理 1000000 条记录?

标签: pythonapache-sparkpysparkapache-spark-sqlpartition

解决方案


答案取决于数据的大小。当一个分区不能保存属于一个分区值的所有数据(例如domain1)时,将创建更多分区,最多创建spark.sql.shuffle.partitions多个。如果您的数据太大,即一个分区会超过 2GB 的限制(另请参阅为什么 Spark RDD 分区对 HDFS 有 2GB 限制?),重新分区将导致OutOfMemoryError
就像提供完整答案的旁注一样:能够将数据放入一个分区中并不一定意味着只为一个分区值生成一个分区。这取决于 - 其中包括 - 执行者的数量以及数据之前的分区方式。Spark 会尽量避免不必要的洗牌,因此可以为一个分区值生成多个分区。

因此,为了防止作业失败,您应该调整spark.sql.shuffle.partitions或将所需的分区数repartition与分区列一起传递。


推荐阅读