首页 > 解决方案 > 如果我明确传递模式,是否需要在 spark 中使用“mergeSchema”选项和镶木地板?

问题描述

从火花文档:

由于模式合并是一项相对昂贵的操作,并且在大多数情况下不是必需的,因此我们从 1.5.0 开始默认将其关闭。您可以通过在读取 Parquet 文件时将数据源选项 mergeSchema 设置为 true 来启用它(如下例所示),或者将全局 SQL 选项 spark.sql.parquet.mergeSchema 设置为 true。

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

我对文档的理解是,如果我有多个具有不同模式的镶木地板分区,如果我使用 .spark 将能够自动合并这些模式spark.read.option("mergeSchema", "true").parquet(path)

如果我在查询时不知道这些分区中存在哪些模式,这似乎是一个不错的选择。

但是,考虑一下我有两个分区的情况,一个使用旧模式,一个使用新模式,不同之处仅在于有一个附加字段。我们还假设我的代码知道新模式并且我能够显式地传递这个模式。

在这种情况下,我会做类似的事情spark.read.schema(my_new_schema).parquet(path)。我希望 Spark 在这种情况下会做的是使用新模式在两个分区中读取,并简单地将新列的空值提供给旧分区中的任何行。这是预期的行为吗?或者我是否也需要option("mergeSchema", "true")在这种情况下使用?

我希望尽可能避免使用 mergeSchema 选项,以避免文档中提到的额外开销。

标签: apache-sparkparquet

解决方案


我尝试从上面链接的 spark 文档中扩展示例代码,我的假设似乎是正确的。见下文:

// This is used to implicitly convert an RDD to a DataFrame.
scala> import spark.implicits._
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
scala> val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF: org.apache.spark.sql.DataFrame = [value: int, square: int]

scala> squaresDF.write.parquet("test_data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
scala> val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
scala> cubesDF: org.apache.spark.sql.DataFrame = [value: int, cube: int]

scala> cubesDF.write.parquet("test_data/test_table/key=2")

// Read the partitioned table

scala> val mergedDF = spark.read.option("mergeSchema", "true").parquet("test_data/test_table")
mergedDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]

scala> mergedDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- cube: integer (nullable = true)
 |-- key: integer (nullable = true)


// Read without mergeSchema option
scala> val naiveDF = spark.read.parquet("test_data/test_table")
naiveDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 1 more field]

// Note that cube column is missing.
scala> naiveDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- key: integer (nullable = true)


// Take the schema from the mergedDF above and use it to read the same table with an explicit schema, but without the "mergeSchema" option.
scala> val explicitSchemaDF = spark.read.schema(mergedDF.schema).parquet("test_data/test_table")
explicitSchemaDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]

// Spark was able to use the correct schema despite not using the "mergeSchema" option
scala> explicitSchemaDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- cube: integer (nullable = true)
 |-- key: integer (nullable = true)

// Data is as expected.
scala> explicitSchemaDF.show()
+-----+------+----+---+
|value|square|cube|key|
+-----+------+----+---+
|    3|     9|null|  1|
|    4|    16|null|  1|
|    5|    25|null|  1|
|    8|  null| 512|  2|
|    9|  null| 729|  2|
|   10|  null|1000|  2|
|    1|     1|null|  1|
|    2|     4|null|  1|
|    6|  null| 216|  2|
|    7|  null| 343|  2|
+-----+------+----+---+

如您所见,在使用显式模式读取数据时,spark 似乎正确地为 parquet 分区中缺少的任何列提供空值。

这让我感到相当有信心,我可以用“不,在这种情况下不需要 mergeSchema 选项”来回答我的问题,但我仍然想知道是否有任何需要注意的警告。其他人的任何额外帮助将不胜感激。


推荐阅读