apache-spark - 如果我明确传递模式,是否需要在 spark 中使用“mergeSchema”选项和镶木地板?
问题描述
从火花文档:
由于模式合并是一项相对昂贵的操作,并且在大多数情况下不是必需的,因此我们从 1.5.0 开始默认将其关闭。您可以通过在读取 Parquet 文件时将数据源选项 mergeSchema 设置为 true 来启用它(如下例所示),或者将全局 SQL 选项 spark.sql.parquet.mergeSchema 设置为 true。
(https://spark.apache.org/docs/latest/sql-data-sources-parquet.html)
我对文档的理解是,如果我有多个具有不同模式的镶木地板分区,如果我使用 .spark 将能够自动合并这些模式spark.read.option("mergeSchema", "true").parquet(path)
。
如果我在查询时不知道这些分区中存在哪些模式,这似乎是一个不错的选择。
但是,考虑一下我有两个分区的情况,一个使用旧模式,一个使用新模式,不同之处仅在于有一个附加字段。我们还假设我的代码知道新模式并且我能够显式地传递这个模式。
在这种情况下,我会做类似的事情spark.read.schema(my_new_schema).parquet(path)
。我希望 Spark 在这种情况下会做的是使用新模式在两个分区中读取,并简单地将新列的空值提供给旧分区中的任何行。这是预期的行为吗?或者我是否也需要option("mergeSchema", "true")
在这种情况下使用?
我希望尽可能避免使用 mergeSchema 选项,以避免文档中提到的额外开销。
解决方案
我尝试从上面链接的 spark 文档中扩展示例代码,我的假设似乎是正确的。见下文:
// This is used to implicitly convert an RDD to a DataFrame.
scala> import spark.implicits._
import spark.implicits._
// Create a simple DataFrame, store into a partition directory
scala> val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF: org.apache.spark.sql.DataFrame = [value: int, square: int]
scala> squaresDF.write.parquet("test_data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
scala> val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
scala> cubesDF: org.apache.spark.sql.DataFrame = [value: int, cube: int]
scala> cubesDF.write.parquet("test_data/test_table/key=2")
// Read the partitioned table
scala> val mergedDF = spark.read.option("mergeSchema", "true").parquet("test_data/test_table")
mergedDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]
scala> mergedDF.printSchema()
root
|-- value: integer (nullable = true)
|-- square: integer (nullable = true)
|-- cube: integer (nullable = true)
|-- key: integer (nullable = true)
// Read without mergeSchema option
scala> val naiveDF = spark.read.parquet("test_data/test_table")
naiveDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 1 more field]
// Note that cube column is missing.
scala> naiveDF.printSchema()
root
|-- value: integer (nullable = true)
|-- square: integer (nullable = true)
|-- key: integer (nullable = true)
// Take the schema from the mergedDF above and use it to read the same table with an explicit schema, but without the "mergeSchema" option.
scala> val explicitSchemaDF = spark.read.schema(mergedDF.schema).parquet("test_data/test_table")
explicitSchemaDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]
// Spark was able to use the correct schema despite not using the "mergeSchema" option
scala> explicitSchemaDF.printSchema()
root
|-- value: integer (nullable = true)
|-- square: integer (nullable = true)
|-- cube: integer (nullable = true)
|-- key: integer (nullable = true)
// Data is as expected.
scala> explicitSchemaDF.show()
+-----+------+----+---+
|value|square|cube|key|
+-----+------+----+---+
| 3| 9|null| 1|
| 4| 16|null| 1|
| 5| 25|null| 1|
| 8| null| 512| 2|
| 9| null| 729| 2|
| 10| null|1000| 2|
| 1| 1|null| 1|
| 2| 4|null| 1|
| 6| null| 216| 2|
| 7| null| 343| 2|
+-----+------+----+---+
如您所见,在使用显式模式读取数据时,spark 似乎正确地为 parquet 分区中缺少的任何列提供空值。
这让我感到相当有信心,我可以用“不,在这种情况下不需要 mergeSchema 选项”来回答我的问题,但我仍然想知道是否有任何需要注意的警告。其他人的任何额外帮助将不胜感激。
推荐阅读
- php - XAMPP:在 OSX 上安装 Php-intl 有问题
- powershell - 按文件名Powershell中的日期列出文件
- python - 浮点范围的百分位数
- sql - 使用When条件的Sql更新查询
- javascript - 我应该如何以角度实现动态路由
- php - 为什么来自 Mac 机器的简单 PHP Curl https POST 请求需要 30 多秒才能完成?
- ios - TVOS:无法使用 GIFU 框架显示 GIF
- ruby-on-rails-5 - “Rails”我想异步实现一个跟随按钮
- java - Mac OS Big Sur 更新后 Eclipse 09-2020 中的 Emtpy 调试窗口
- jquery - 包含 jQuery.ajax 中使用的问号的 URL 在特定设置中发生变化(Docker、Nginx 重写规则)