首页 > 解决方案 > 让 PySpark 每列值输出一个文件(重新分区/分区不工作)

问题描述

我已经看到很多答案和 blob 帖子表明:

df.repartition('category').write().partitionBy('category')

df将为每个类别输出一个文件,但如果唯一“类别”值的数量小于默认分区的数量(通常为 200),则这似乎不是真的。

当我对具有 100 个类别的文件使用上述代码时,我最终会得到 100 个文件夹,每个文件夹包含 1 到 3 个“部分”文件,而不是在同一个“部分”中包含具有给定“类别”值的所有行。https://stackoverflow.com/a/42780452/529618的答案似乎可以解释这一点。

每个分区值获得一个文件的最快方法是什么?


我尝试过的事情

我见过很多声称

df.repartition(1, 'category').write().partitionBy('category')
df.repartition(2, 'category').write().partitionBy('category')

将分别创建“每个类别恰好一个文件”和“每个类别恰好两个文件”,但这似乎不是这个参数的工作方式。文档清楚地表明,参数numPartitions是要创建的分区总数,而不是每列值的分区数。根据该文档,将此参数指定为 1 应该(意外地)在写入文件时为每个分区输出一个文件,但可能只是因为它删除了所有并行性并强制您的整个 RDD 在单个节点上进行洗牌/重新计算。

required_partitions = df.select('category').distinct().count()
df.repartition(required_partitions, 'category').write().partitionBy('category')

以上似乎是一种基于记录行为的解决方法,但由于多种原因,这种方法代价高昂。一方面,如果 df 昂贵且未缓存(和/或太大以至于仅为此目的缓存将是浪费的),则单独计数,并且数据帧的任何重新分区都可能导致多阶段工作流程中不必要的改组一路上有各种数据帧输出。

标签: apache-sparkpysparkpartitioning

解决方案


“最快”的方式可能取决于实际的硬件设置和实际数据(以防它出现偏差)。据我所知,我也同意这df.repartition('category').write().partitionBy('category')无助于解决您的问题。

我们在我们的应用程序中遇到了类似的问题,但是我们没有先进行计数,然后再进行重新分区,而是将数据的写入和每个分区只有一个文件的要求分离到两个不同的 Spark 作业中。第一个作业被优化为写入数据。第二个作业只是遍历分区文件夹结构并简单地读取每个文件夹/分区的数据,将其数据合并到一个分区并将它们覆盖回来。同样,我无法判断这是否也是您环境中最快的方法,但对我们来说,它成功了。

在对这个主题进行了一些研究之后,Databricks 上的Auto Optimize Writes功能用于写入 Delta 表。在这里,他们使用了类似的方法:首先写入数据,然后运行单独的 OPTIMIZE 作业以将文件聚合到单个文件中。在提到的链接中,您将找到以下说明:

“在单独写入之后,Azure Databricks 会检查文件是否可以进一步压缩,并运行 OPTIMIZE 作业 [...] 以进一步压缩具有最多小文件的分区的文件。”

附带说明:确保将配置保持spark.sql.files.maxRecordsPerFile为 0(默认值)或负数。否则,仅此配置可能会导致“类别”列中具有相同值的数据的多个文件。


推荐阅读