首页 > 解决方案 > 从 spark 写入 hadoop 的最佳实践

问题描述

我正在查看一位同事编写的一些代码,我发现了一个这样的方法:

def writeFile(df: DataFrame,
              partitionCols: List[String],
              writePath: String): Unit {

    val df2 = df.repartition(partitionCols.get.map(col): _*)
    val dfWriter = df2.write.partitionBy(partitionCols.get.map(col): _*)
    dfWriter
        .format("parquet")
        .mode(SaveMode.Overwrite)
        .option("compression", "snappy")
        .save(writePath)

}

repartition像这样调用一组预定义的列,然后调用partitionBy,然后保存到磁盘通常是一种好习惯吗?

标签: scalaapache-sparkpartition

解决方案


通常,您repartition使用与 相同的列进行调用,partitionBy以便在每个分区中有一个 parquet 文件。这是在这里实现的。现在您可以争辩说,这可能意味着 parquet 文件大小变大或更糟可能导致内存溢出。

这个问题通常通过向 Dataframe 添加 row_number 来处理,然后指定每个 parquet 文件可以拥有的文档数。就像是

val repartitionExpression =colNames.map(col) :+ floor(col(RowNumber) / docsPerPartition)
// now use this to repartition 

要回答下一部分,因为persist after partitionBy这里不需要,因为分区后它直接写入磁盘。


推荐阅读