scala - Spark:为 MergeSchema 字段选择默认值
问题描述
我有一个镶木地板,它有一个像这样的旧模式:
| name | gender | age |
| Tom | Male | 30 |
随着我们的架构更新为:
| name | gender | age | office |
我们在读取旧 parquet 时使用了 mergeSchema:
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
但是从这些旧的镶木地板文件中读取时,我得到了以下输出:
| name | gender | age | office |
| Tom | Male | 30 | null |
这是正常的。office
但是,当且仅当旧模式中不存在该字段时,我想为(例如“加利福尼亚”)采用默认值。可能吗 ?
解决方案
当某些 parquet 文件中不存在列但存在于其他 parquet 文件中时,您没有任何简单的方法来设置默认值
在 Parquet 文件格式中,每个 Parquet 文件都包含架构定义。默认情况下,当读取 parquet 时,Spark 从 parquet 文件中获取模式。选项的唯一效果mergeSchema
是,Spark 不会从一个随机 parquet 文件中检索模式,而是mergeSchema
读取所有 parquet 文件的所有模式并将它们合并。
所以你不能在不修改 parquet 文件的情况下设置默认值。
另一种可能的方法是在读取镶木地板时通过设置如下选项来提供您自己的模式.schema()
:
spark.read.schema(StructType(Array(FieldType("name", StringType), ...)).parquet(...)
但在这种情况下,没有设置默认值的选项。
所以唯一剩下的解决方案是手动添加列默认值
如果我们有两个镶木地板,第一个包含旧模式的数据:
+----+------+---+
|name|gender|age|
+----+------+---+
|Tom |Male |30 |
+----+------+---+
第二个包含具有新架构的数据:
+-----+------+---+------+
|name |gender|age|office|
+-----+------+---+------+
|Jane |Female|45 |Idaho |
|Roger|Male |22 |null |
+-----+------+---+------+
如果您不想替换null
"office" 列中的所有值,可以使用.na.fill
如下:
spark.read.option("mergeSchema", "true").parquet(path).na.fill("California", Array("office"))
你得到以下结果:
+-----+------+---+----------+
|name |gender|age|office |
+-----+------+---+----------+
|Jane |Female|45 |Idaho |
|Roger|Male |22 |California|
|Tom |Male |30 |California|
+-----+------+---+----------+
如果您希望只有旧数据获得默认值,则必须将每个镶木地板文件读取到数据框,必要时添加具有默认值的列,然后合并所有生成的数据框:
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.util.CaseInsensitiveStringMap
ParquetTable("my_table",
sparkSession = spark,
options = CaseInsensitiveStringMap.empty(),
paths = Seq(path),
userSpecifiedSchema = None,
fallbackFileFormat = classOf[ParquetFileFormat]
).fileIndex.allFiles().map(file => {
val dataframe = spark.read.parquet(file.getPath.toString)
if (dataframe.columns.contains("office")) {
dataframe
} else {
dataframe.withColumn("office", lit("California"))
}
}).reduce(_ unionByName _)
你得到以下结果:
+-----+------+---+----------+
|name |gender|age|office |
+-----+------+---+----------+
|Jane |Female|45 |Idaho |
|Roger|Male |22 |null |
|Tom |Male |30 |California|
+-----+------+---+----------+
请注意,所有部分ParquetTable([...].allFiles()
都是检索镶木地板文件列表。如果您使用 hadoop 或本地文件系统,则可以简化它。
推荐阅读
- ros - Rosplay "Bag time" 在 rosrecord 后 20 秒开始
- r - R 和“plm”包:如何使用 lapply 循环遍历“pdata.frame”对象的列
- javascript - 在 data-href URL 中使用哈希时,Facebook 审核链接不显示
- html - 在不同的视口上保持列高
- jquery - 我们如何在剑道网格的动态添加字段中使用转义?
- javascript - 无法使用 angular2 正确显示引导轮播数据
- azure - 如何在 Azure B2C 中的注册和登录策略中添加验证码
- mysql - MySQL UPDATE 与 LEFT JOINS
- python-3.x - I want to get variable values from jenkins in my python code for desired capabilities
- r - 在闪亮的应用程序中刷新数据