首页 > 解决方案 > Spark:为 MergeSchema 字段选择默认值

问题描述

我有一个镶木地板,它有一个像这样的旧模式:

| name | gender | age |
| Tom  | Male   | 30  |

随着我们的架构更新为:

| name | gender | age | office |

我们在读取旧 parquet 时使用了 mergeSchema:

val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")

但是从这些旧的镶木地板文件中读取时,我得到了以下输出:

| name | gender | age | office |
| Tom  | Male   | 30  | null   |

这是正常的。office但是,当且仅当旧模式中不存在该字段时,我想为(例如“加利福尼亚”)采用默认值。可能吗 ?

标签: scalaapache-spark

解决方案


当某些 parquet 文件中不存在列但存在于其他 parquet 文件中时,您没有任何简单的方法来设置默认值

在 Parquet 文件格式中,每个 Parquet 文件都包含架构定义。默认情况下,当读取 parquet 时,Spark 从 parquet 文件中获取模式。选项的唯一效果mergeSchema是,Spark 不会从一个随机 parquet 文件中检索模式,而是mergeSchema读取所有 parquet 文件的所有模式并将它们合并。

所以你不能在不修改 parquet 文件的情况下设置默认值。

另一种可能的方法是在读取镶木地板时通过设置如下选项来提供您自己的模式.schema()

spark.read.schema(StructType(Array(FieldType("name", StringType), ...)).parquet(...)

但在这种情况下,没有设置默认值的选项

所以唯一剩下的解决方案是手动添加列默认值

如果我们有两个镶木地板,第一个包含旧模式的数据:

+----+------+---+
|name|gender|age|
+----+------+---+
|Tom |Male  |30 |
+----+------+---+

第二个包含具有新架构的数据:

+-----+------+---+------+
|name |gender|age|office|
+-----+------+---+------+
|Jane |Female|45 |Idaho |
|Roger|Male  |22 |null  |
+-----+------+---+------+

如果您不想替换null"office" 列中的所有值,可以使用.na.fill如下:

spark.read.option("mergeSchema", "true").parquet(path).na.fill("California", Array("office"))

你得到以下结果:

+-----+------+---+----------+
|name |gender|age|office    |
+-----+------+---+----------+
|Jane |Female|45 |Idaho     |
|Roger|Male  |22 |California|
|Tom  |Male  |30 |California|
+-----+------+---+----------+

如果您希望只有旧数据获得默认值,则必须将每个镶木地板文件读取到数据框,必要时添加具有默认值的列,然后合并所有生成的数据框:

import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.util.CaseInsensitiveStringMap

ParquetTable("my_table",
  sparkSession = spark,
  options = CaseInsensitiveStringMap.empty(),
  paths = Seq(path),
  userSpecifiedSchema = None,
  fallbackFileFormat = classOf[ParquetFileFormat]
).fileIndex.allFiles().map(file => {
  val dataframe = spark.read.parquet(file.getPath.toString)

  if (dataframe.columns.contains("office")) {
    dataframe
  } else {
    dataframe.withColumn("office", lit("California"))
  }
}).reduce(_ unionByName _)

你得到以下结果:

+-----+------+---+----------+
|name |gender|age|office    |
+-----+------+---+----------+
|Jane |Female|45 |Idaho     |
|Roger|Male  |22 |null      |
|Tom  |Male  |30 |California|
+-----+------+---+----------+

请注意,所有部分ParquetTable([...].allFiles()都是检索镶木地板文件列表。如果您使用 hadoop 或本地文件系统,则可以简化它。


推荐阅读