scala - How to extract values from JSON-encoded column?
问题描述
I have a Spark Dataframe with the following schema.
[{ "map": {
"completed-stages": 1,
"total-stages": 1 },
"rec": "test-plan",
"status": {
"state": "SUCCESS"
}
},
{ "map": {
"completed-stages": 1,
"total-stages": 1 },
"rec": "test-proc",
"status": {
"state": "FAILED"
}
}]
I want to transform it into another DF having the following schema
[{"rec": "test-plan", "status": "SUCCESS"}, {"rec": "test-pROC", "status": "FAILED"}]
I have written the following code, but it doesn't compile and complains of wrong encoding.
val fdf = DF.map(f => {
val listCommands = f.get(0).asInstanceOf[WrappedArray[Map[String, Any]]]
val m = listCommands.map(h => {
var rec = "none"
var status = "none"
if(h.exists("status" == "state" -> _)) {
status = (h.get("status") match {
case Some(x) => x.asInstanceOf[HashMap[String, String]].getOrElse("state", "none")
case _ => "none"
})
if(h.contains("rec")) {
rec = (h.get("rec") match {
case Some(x: String) => x
case _ => "none"
})
}
}
Map("status"->status, "rec"->rec)
})
val rm = m.flatten
rm
})
Please suggest the right way.
解决方案
这将是棘手的,因为 JSON 的顶级元素是不一样的,即你有map1
and map2
,因此模式是不一致的。我会与“数据生产者”交谈并请求更改,以便命令的名称由单独的元素描述。
给定 DataFrame 的架构如下:
scala> commands.printSchema
root
|-- commands: array (nullable = true)
| |-- element: string (containsNull = true)
以及其中的元素(行)数:
scala> commands.count
res1: Long = 1
您必须首先分解元素数组,然后访问感兴趣的commands
字段。
// 1. Explode the array
val commandsExploded = commands.select(explode($"commands") as "command")
scala> commandsExploded.count
res2: Long = 2
让我们创建 JSON 编码记录的模式。一种可能如下。
// Note that it accepts map1 and map2 fields
import org.apache.spark.sql.types._
val schema = StructType(
StructField("map1",
StructType(
StructField("completed-stages", LongType, true) ::
StructField("total-stages", LongType, true) :: Nil), true) ::
StructField("map2",
StructType(
StructField("completed-stages", LongType, true) ::
StructField("total-stages", LongType, true) :: Nil), true) ::
StructField("rec", StringType,true) ::
StructField("status", StructType(
StructField("state", StringType, true) :: Nil), true
) :: Nil)
有了它,您应该使用from_json标准函数,该函数采用带有 JSON 编码字符串和模式的列。
val commands = commandsExploded.select(from_json($"command", schema) as "command")
scala> commands.show(truncate = false)
+-------------------------------+
|command |
+-------------------------------+
|[[1, 1],, test-plan, [SUCCESS]]|
|[, [1, 1], test-proc, [FAILED]]|
+-------------------------------+
让我们看一下commands
数据集的模式。
scala> commands.printSchema
root
|-- command: struct (nullable = true)
| |-- map1: struct (nullable = true)
| | |-- completed-stages: long (nullable = true)
| | |-- total-stages: long (nullable = true)
| |-- map2: struct (nullable = true)
| | |-- completed-stages: long (nullable = true)
| | |-- total-stages: long (nullable = true)
| |-- rec: string (nullable = true)
| |-- status: struct (nullable = true)
| | |-- state: string (nullable = true)
复杂字段如rec
和status
是可访问的结构.
。
val recs = commands.select(
$"command.rec" as "rec",
$"command.status.state" as "status")
scala> recs.show
+---------+-------+
| rec| status|
+---------+-------+
|test-plan|SUCCESS|
|test-proc| FAILED|
+---------+-------+
将其转换为单记录 JSON 编码数据集需要Dataset.toJSON后跟collect_list标准函数。
val result = recs.toJSON.agg(collect_list("value"))
scala> result.show(truncate = false)
+-------------------------------------------------------------------------------+
|collect_list(value) |
+-------------------------------------------------------------------------------+
|[{"rec":"test-plan","status":"SUCCESS"}, {"rec":"test-proc","status":"FAILED"}]|
+-------------------------------------------------------------------------------+
推荐阅读
- sql - 为什么我们需要主键?
- python - 如何使用 web3.py 在给定的 ETH 地址获取特定的代币余额
- reactjs - “react-bootstrap”不包含名为“Card”的导出
- sql-server - SQL Server 查询以显示所有用户表中的表名和特定列名和当前值
- web - 是否可以从浏览器启动 Tableau Desktop 应用程序?
- c# - 如何在邮件正文的表格内打印以下数据集
- dart - 使用 firebase_messaging => 获取所有主题的 Flutter Push
- swt - 如何使用 OpenJDK 11 和 Eclipse photon 2018-12 在 SWT 部件上添加 OpenJavafx 11 GUI 组件
- javascript - 数组为空时未处理的 Promise Rejection 警告 Node js
- hive - Where 子句中的 Hive 变量