首页 > 解决方案 > 使用 Spark/Scala 将嵌套 JSON 转换为 DataFrame

问题描述

我有一个嵌套的 JSON,我需要在其中转换为展平的 DataFrame,而无需在其中定义或分解任何列名。

val df = sqlCtx.read.option("multiLine",true).json("test.json")

所以这就是我的数据的样子:

[
  {
    "symbol": “TEST3",
    "timestamp": "2019-05-07 16:00:00",
    "priceData": {
      "open": "1177.2600",
      "high": "1179.5500",
      "low": "1176.6700",
      "close": "1179.5500",
      "volume": "49478"
    }
  },
  {
    "symbol": “TEST4",
    "timestamp": "2019-05-07 16:00:00",
    "priceData": {
      "open": "189.5660",
      "high": "189.9100",
      "low": "189.5100",
      "close": "189.9100",
      "volume": "267986"
    }
  }
]

标签: scalaapache-sparkapache-spark-sqlspark-streaming

解决方案


这是使用DatabricksDataFrameFlattener实现的类的一种方法:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{DataType, StructType}

implicit class DataFrameFlattener(df: DataFrame) {
      def flattenSchema: DataFrame = {
        df.select(flatten(Nil, df.schema): _*)
      }

      protected def flatten(path: Seq[String], schema: DataType): Seq[Column] = schema match {
        case s: StructType => s.fields.flatMap(f => flatten(path :+ f.name, f.dataType))
        case other => col(path.map(n => s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil
      }
    }

df.flattenSchema.show

和输出:

+---------------+--------------+-------------+--------------+----------------+------+-------------------+
|priceData.close|priceData.high|priceData.low|priceData.open|priceData.volume|symbol|          timestamp|
+---------------+--------------+-------------+--------------+----------------+------+-------------------+
|      1179.5500|     1179.5500|    1176.6700|     1177.2600|           49478| TEST3|2019-05-07 16:00:00|
|       189.9100|      189.9100|     189.5100|      189.5660|          267986| TEST4|2019-05-07 16:00:00|
+---------------+--------------+-------------+--------------+----------------+------+-------------------+

或者你可以执行一个普通的选择:

df.select(
  "priceData.close", 
  "priceData.high", 
  "priceData.low", 
  "priceData.open", 
  "priceData.volume", 
  "symbol", 
  "timestamp").show

输出:

+---------+---------+---------+---------+------+------+-------------------+
|    close|     high|      low|     open|volume|symbol|          timestamp|
+---------+---------+---------+---------+------+------+-------------------+
|1179.5500|1179.5500|1176.6700|1177.2600| 49478| TEST3|2019-05-07 16:00:00|
| 189.9100| 189.9100| 189.5100| 189.5660|267986| TEST4|2019-05-07 16:00:00|
+---------+---------+---------+---------+------+------+-------------------+

推荐阅读