首页 > 解决方案 > 如何处理火花中缺少的嵌套字段?

问题描述

给定两个案例类:

case class Response(
  responseField: String
  ...
  items: List[Item])

case class Item(
  itemField: String
  ...)

我正在创建一个Response数据集:

val dataset = spark.read.format("parquet")
                .load(inputPath)
                .as[Response]
                .map(x => x)

itemField当任何行中不存在时,就会出现问题,并且 spark 会引发此错误org.apache.spark.sql.AnalysisException: No such struct field itemField。如果itemField没有嵌套,我可以通过dataset.withColumn("itemField", lit("")). 是否有可能在该List领域内做同样的事情?

标签: scalaapache-sparkapache-spark-sql

解决方案


我假设如下:

数据是使用以下模式编写的:

case class Item(itemField: String)
case class Response(responseField: String, items: List[Item])
Seq(Response("a", List()), Response("b", List())).toDF.write.parquet("/tmp/structTest")

现在架构更改为:

case class Item(itemField: String, newField: Int)
case class Response(responseField: String, items: List[Item])
spark.read.parquet("/tmp/structTest").as[Response].map(x => x) // Fails

对于 Spark 2.4,请参阅: Spark - 如何将元素添加到结构数组

对于 Spark 2.3,这应该有效:

val addNewField: (Array[String], Array[Int]) => Array[Item] = (itemFields, newFields) => itemFields.zip(newFields).map { case (i, n) => Item(i, n) }

val addNewFieldUdf = udf(addNewField)
spark.read.parquet("/tmp/structTest")
   .withColumn("items", addNewFieldUdf(
      col("items.itemField") as "itemField", 
      array(lit(1)) as "newField"
   )).as[Response].map(x => x) // Works

推荐阅读