scala - 在 Scala Spark Dataframe 中展平嵌套的 json
问题描述
我有多个来自任何 restapi 的 json,但我不知道它的架构。我无法使用 dataframes 的 explode 功能,因为我不知道由 spark api 创建的列名。
1.我们可以通过解码值来存储嵌套数组元素键的键dataframe.schema.fields
,因为spark只提供数据帧行中的值部分,并将顶级键作为列名。
数据框——
+--------------------+
| stackoverflow|
+--------------------+
|[[[Martin Odersky...|
+--------------------+
是否有任何最佳方法通过在运行时确定架构来使用数据框方法来展平 json。
示例 Json -:
{
"stackoverflow": [{
"tag": {
"id": 1,
"name": "scala",
"author": "Martin Odersky",
"frameworks": [
{
"id": 1,
"name": "Play Framework"
},
{
"id": 2,
"name": "Akka Framework"
}
]
}
},
{
"tag": {
"id": 2,
"name": "java",
"author": "James Gosling",
"frameworks": [
{
"id": 1,
"name": "Apache Tomcat"
},
{
"id": 2,
"name": "Spring Boot"
}
]
}
}
]
}
注意 - 我们需要在 dataframe 中进行所有操作,因为有大量数据即将到来,我们无法解析每一个 json。
解决方案
尽量避免展平所有列。
创建了辅助函数 & 你可以直接调用df.explodeColumns
DataFrame。
下面的代码将展平多级数组和结构类型列。
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try
implicit class DFHelpers(df: DataFrame) {
def columns = {
val dfColumns = df.columns.map(_.toLowerCase)
df.schema.fields.flatMap { data =>
data match {
case column if column.dataType.isInstanceOf[StructType] => {
column.dataType.asInstanceOf[StructType].fields.map { field =>
val columnName = column.name
val fieldName = field.name
col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
}.toList
}
case column => List(col(s"${column.name}"))
}
}
}
def flatten: DataFrame = {
val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
empty match {
case false =>
df.select(columns: _*).flatten
case _ => df
}
}
def explodeColumns = {
@tailrec
def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
}))
case _ => cdf
}
columns(df.flatten)
}
}
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try
defined class DFHelpers
扁平列
scala> df.printSchema
root
|-- stackoverflow: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- tag: struct (nullable = true)
| | | |-- author: string (nullable = true)
| | | |-- frameworks: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- id: long (nullable = true)
| | | | | |-- name: string (nullable = true)
| | | |-- id: long (nullable = true)
| | | |-- name: string (nullable = true)
scala> df.explodeColumns.printSchema
root
|-- author: string (nullable = true)
|-- frameworks_id: long (nullable = true)
|-- frameworks_name: string (nullable = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
scala>
推荐阅读
- java - org.apache.poi.ss.usermodel 包可以从多个模块访问:poi、poi.ooxm
- javascript - Puppeteer 抛出无效参数错误
- sharepoint - 如何在在线共享点中进行完整的站点集合级别重新索引?
- java - java中的HQL会话
- git - Bitbucket 看不到新添加的文件
- javascript - 为什么我无法访问在对象的方法中创建的对象
- python - 绘制函数时出错,包括integrate.quad
- machine-learning - 屏幕截图 > 文本 (OCR) > 关键信息
- c++ - 在代码块文件(不是项目)中提供命令行参数
- node.js - 设置 MEAN 堆栈不呈现 app-root