首页 > 解决方案 > 在 Scala 中使用 from_json() 解析 DataFrame 中的多行

问题描述

我在 Spark DataFrame 的列中有一个 JSON,如下所示:

ID|           Text|           JSON
------------------------------------------------------------------------------
1|             xyz|          [{"Hour": 1, "Total": 10, "Fail": 1}, {"Hour": 2, "Total": 40, "Fail": 4}, {"Hour": 3, "Total": 20, "Fail": 2}]

我正在使用以下架构

val schema = StructType(Array(StructField("Hour", IntegerType),
   StructField("Total", IntegerType), StructField("Fail", IntegerType))

我正在使用以下代码来解析 DataFrame 并将 JSON 输出为多列

val newDF = DF.withColumn("JSON", from_json(col("JSON"), schema)).select(col("JSON.*"))
newDF.show()

上面的代码只是从 JSON 中解析出一条记录。但是,我希望它解析 JSON 中的所有记录。

输出如下:

Hour|       Total|       Fail|
-------------------------------
   1|          10|          1|
-------------------------------

但是,我希望输出如下:

Hour|       Total|       Fail|
-------------------------------
   1|          10|          1|
   2|          40|          4|
   3|          20|          2|
-------------------------------

有人可以告诉我。我错过了什么!!

提前致谢。

标签: jsonscalaapache-sparkdataframe

解决方案


如果我正确解释了您的示例数据,则您的JSON列是带有已发布架构的 JSON 元素序列。您需要在应用之前分解列from_json,如下所示:

val df = Seq(
  (1, "xyz", Seq("""{"Hour": 1, "Total": 10, "Fail": 1}""",
                 """{"Hour": 2, "Total": 40, "Fail": 4}""",
                 """{"Hour": 3, "Total": 20, "Fail": 2}""")
  )).toDF("ID", "Text", "JSON")

import org.apache.spark.sql.types._

val jsonSchema = StructType(Array(
  StructField("Hour", IntegerType),
  StructField("Total", IntegerType),
  StructField("Fail", IntegerType)
))

df.
  withColumn("JSON", explode(col("JSON"))).
  withColumn("JSON", from_json(col("JSON"), jsonSchema)).
  select("JSON.*").
  show
// +----+-----+----+
// |Hour|Total|Fail|
// +----+-----+----+
// |   1|   10|   1|
// |   2|   40|   4|
// |   3|   20|   2|
// +----+-----+----+

推荐阅读