首页 > 解决方案 > 如何使用通过旋转列生成的从 json 值生成的模式创建新的数据框

问题描述

我有一个具有以下架构的数据框。

root
 |-- id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)

它首先以“值”列为中心。

val pivot_df = df.groupBy("id","key").pivot("type").agg(first("value"))

“value”列的数据可能包含嵌套的 json 。所以早些时候我正在转换整个“价值”列以获得所需的架构。

val schema = spark.read.json(df.select("value").as[String]).schema

val res_df = df.select($"id",$"type",$"key",from_json($"value",schema).as("s")).select("id","type","key","s.*")

但现在我需要先将它放在“值”列数据上,然后从所有新生成的列的 json 中读取。有人可以建议应该应用哪些代码更改?

标签: scalaapache-sparkpysparkapache-spark-sqlspark-streaming

解决方案


推荐阅读