首页 > 解决方案 > Spark,在数组数组中添加新字段

问题描述

我是 Spark 的新手,具有以下df架构(其中一部分)。

root
|-- value: binary (nullable = false)
|-- event: struct (nullable = false)
|    |-- eventtime: struct (nullable = true)
|    |    |-- seconds: long (nullable = true)
|    |    |-- ms: float (nullable = true)
|    |-- fault: struct (nullable = true)
|    |    |-- collections: struct (nullable = true)
|    |    |    |-- snapshots: array (nullable = false) --> ** FIRST LEVEL ARRAY (or array of arrays) **
|    |    |    |    |-- element: struct (containsNull = false)
|    |    |    |    |    |-- ringbuffer: struct (nullable = true)
|    |    |    |    |    |    |-- columns: array (nullable = false) --> ** SECOND LEVEL ARRAY **
|    |    |    |    |    |    |    |-- element: struct (containsNull = false)
|    |    |    |    |    |    |    |    |-- doubles: struct (nullable = true)
|    |    |    |    |    |    |    |    |    |-- values: array (nullable = false)
|    |    |    |    |    |    |    |    |    |    |-- element: float (containsNull = false)
....................................
..........................

我可以fault使用以下代码添加一个新字段,并且新字段comp_idcollections.

df.withColumn("event", col("event").withField("fault.comp_id", lit(1234)))

如何在数组数组中添加新字段。例如,在 ? 下添加新test_fieldcolumns?我试图通过定义第一个索引来进入数组0

df.withColumn("event",col("event").withField("fault.collections.snapshots.0.ringbuffer.columns.0.test_field", lit("test_value")))

但是得到了这个错误

org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '.0' expecting {<EOF>, '.', '-'}(line 1, pos 25)

== SQL ==
fault.snapshots.snapshots.0.ringbuffer.columns.0.test_field
-------------------------^^^
  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:61)
  at org.apache.spark.sql.catalyst.expressions.UpdateFields$.nameParts(complexTypeCreator.scala:693)
  at org.apache.spark.sql.catalyst.expressions.UpdateFields$.apply(complexTypeCreator.scala:701)
  at org.apache.spark.sql.Column.withField(Column.scala:927)
  at org.apache.spark.sql.Dataset.transform(Dataset.scala:2751)
  at org.apache.spark.sql.Dataset.transform(Dataset.scala:2751)


因此,所需的架构将如下所示。

root
|-- value: binary (nullable = false)
|-- event: struct (nullable = false)
|    |-- eventtime: struct (nullable = true)
|    |    |-- seconds: long (nullable = true)
|    |    |-- ms: float (nullable = true)
|    |-- fault: struct (nullable = true)
|    |    |-- collections: struct (nullable = true)
|    |    |    |-- snapshots: array (nullable = false) --> ** FIRST LEVEL ARRAY (or array of arrays) **
|    |    |    |    |-- element: struct (containsNull = false)
|    |    |    |    |    |-- ringbuffer: struct (nullable = true)
|    |    |    |    |    |    |-- columns: array (nullable = false) --> ** SECOND LEVEL ARRAY **
|    |    |    |    |    |    |    |-- element: struct (containsNull = false)
|    |    |    |    |    |    |    |    |-- doubles: struct (nullable = true)
|    |    |    |    |    |    |    |    |    |-- values: array (nullable = false)
|    |    |    |    |    |    |    |    |    |    |-- element: float (containsNull = false)
|    |    |    |    |    |    |    |    |-- test_field: string (nullable = true)

标签: scalaapache-spark

解决方案


推荐阅读