json - Scala - 如何将 JSON 键和值转换为列
问题描述
如何将下面的 Input Json 解析为键和值列。任何帮助表示赞赏。
输入:
{
"name" : "srini",
"value": {
"1" : "val1",
"2" : "val2",
"3" : "val3"
}
}
Output DataFrame Column:
name key value
-----------------------------
srini 1 val1
srini 2 val2
srini 3 val3
//++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++Input DataFrame :
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|json_file |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"file_path":"AAA/BBB.CCC.zip","file_name":"AAA_20200202122754.json","received_time":"2020-03-31","obj_cls":"Monitor","obj_cls_inst":"Monitor","relation_tree":"Source~>HD_Info~>Monitor","s_tag":"ABC1234","Monitor":{"Index":"0","Vendor_Data":"58F5Y","Monitor_Type":"Lenovo Monitor","HnfoID":"650FEC74"}}|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
How to convert this above json file in a DataFrame like below :
+----------------+-----------------------+--------------+--------+-------------+-------------------------+----------+----------------+----------------+
|file_path |file_name |received_time |obj_cls |obj_cls_inst |relation_tree |s_tag |attribute_name |attribute_value |
+----------------+-----------------------+--------------+--------+-------------+-------------------------+----------+----------------+----------------+
|AAA/BBB.CCC.zip |AAA_20200202122754.json|2020-03-31 |Monitor |Monitor |Source~>HD_Info~>Monitor |ABC1234 |Index |0 |
+----------------+-----------------------+--------------+--------+-------------+-------------------------+----------+----------------+----------------+
|AAA/BBB.CCC.zip |AAA_20200202122754.json|2020-03-31 |Monitor |Monitor |Source~>HD_Info~>Monitor |ABC1234 |Vendor_Data |58F5Y |
+----------------+-----------------------+--------------+--------+-------------+-------------------------+----------+----------------+----------------+
|AAA/BBB.CCC.zip |AAA_20200202122754.json|2020-03-31 |Monitor |Monitor |Source~>HD_Info~>Monitor |ABC1234 |Monitor_Type |Lenovo Monitor |
+----------------+-----------------------+--------------+--------+-------------+-------------------------+----------+----------------+----------------+
|AAA/BBB.CCC.zip |AAA_20200202122754.json|2020-03-31 |Monitor |Monitor |Source~>HD_Info~>Monitor |ABC1234 |HnfoID |650FEC74 |
+----------------+-----------------------+--------------+--------+-------------+-------------------------+----------+----------------+----------------+
//**********************************************
val rawData = sparkSession.sql("select 1").withColumn("obj_cls", lit("First")).withColumn("s_tag", lit("S_12345")).withColumn("jsonString", lit("""{"id":""1,"First":{"Info":"ABCD123","Res":"5.2"}}"""))
解决方案
将 json 加载到 DF 后,如下所示:
+-----+------------------+
| name| value|
+-----+------------------+
|srini|[val1, val2, val3]|
+-----+------------------+
首先,您选择整个值项:
df.select($"name", $"value.*")
这会给你这个:
+-----+----+----+----+
| name| 1| 2| 3|
+-----+----+----+----+
|srini|val1|val2|val3|
+-----+----+----+----+
然后你需要将列转成行,为此我通常定义一个辅助函数 kv:
def kv (columnsToTranspose: Array[String]) = explode(array(columnsToTranspose.map {
c => struct(lit(c).alias("k"), col(c).alias("v"))
}: _*))
然后为所需的列创建一个数组:
val pivotCols = Array("1", "2", "3")
最后将该函数应用于前面的 DF:
df.select($"name", $"value.*")
.withColumn("kv", kv(pivotCols))
.select($"name", $"kv.k" as "key", $"kv.v" as "value")
结果:
+-----+---+-----+
| name|key|value|
+-----+---+-----+
|srini| 1| val1|
|srini| 2| val2|
|srini| 3| val3|
+-----+---+-----+
编辑
如果您不想手动指定要旋转的列,则可以使用中间 df,如下所示:
val dfIntermediate = df.select($"name", $"value.*")
dfIntermediate.withColumn("kv", kv(dfIntermediate.columns.tail))
.select($"name", $"kv.k" as "key", $"kv.v" as "value")
您将获得完全相同的结果:
+-----+---+-----+
| name|key|value|
+-----+---+-----+
|srini| 1| val1|
|srini| 2| val2|
|srini| 3| val3|
+-----+---+-----+
编辑2
与新示例相同,您只需要更改您读取/透视的列
val pivotColumns = Array("HnfoId", "Index", "Monitor_Type", "Vendor_Data")
df.select("file_path", "file_name", "received_time", "obj_cls", "obj_cls_inst", "relation_tree", "s_Tag", "Monitor.*").withColumn("kv", kv(pivotColumns)).select($"file_path", $"file_name", $"received_time", $"obj_cls", $"obj_cls_inst", $"relation_tree", $"s_Tag", $"kv.k" as "attribute_name", $"kv.v" as "attribute_value").show
+---------------+--------------------+-------------+-------+------------+--------------------+-------+--------------+---------------+
| file_path| file_name|received_time|obj_cls|obj_cls_inst| relation_tree| s_Tag|attribute_name|attribute_value|
+---------------+--------------------+-------------+-------+------------+--------------------+-------+--------------+---------------+
|AAA/BBB.CCC.zip|AAA_2020020212275...| 2020-03-31|Monitor| Monitor|Source~>HD_Info~>...|ABC1234| HnfoId| 650FEC74|
|AAA/BBB.CCC.zip|AAA_2020020212275...| 2020-03-31|Monitor| Monitor|Source~>HD_Info~>...|ABC1234| Index| 0|
|AAA/BBB.CCC.zip|AAA_2020020212275...| 2020-03-31|Monitor| Monitor|Source~>HD_Info~>...|ABC1234| Monitor_Type| Lenovo Monitor|
|AAA/BBB.CCC.zip|AAA_2020020212275...| 2020-03-31|Monitor| Monitor|Source~>HD_Info~>...|ABC1234| Vendor_Data| 58F5Y|
+---------------+--------------------+-------------+-------+------------+--------------------+-------+--------------+---------------+
推荐阅读
- kdb - 为什么在 kdb 中强制为左连接键入查找表
- python - 使用 selenium 在网页上的两个文本值之间选择元素
- reactjs - 从功能组件中公开一个功能 - React
- mysql - 如何在一条记录中选择表 A 的数据,表 B 中的其他列填充表 C 中的数据
- kotlin - 如何在 Kotlin 的 onResume 中使用 savedInstanceState?
- android - 无法为 PagedList 设置页面大小 - Android jetpack
- css - 当输入具有“必需”属性时,在标签后添加星号
- bixby - 设备测试未响应(私人提交)
- javascript - 用音符名称标记 y 轴刻度
- javascript - 带有 Javascript 的 VueJS 中表格的下一个/上一个按钮