json - Unable to create the column and value from the JSON nested key Value Pair using Spark/Scala
问题描述
I am working on converting the JSON which has nested key value pair to automatically create the columns for the keys and populate the rows for the values. I don’t want to create the schema as the no of columns (keys) will differ for each of the file. I am using Spark version 2.3 and Scala version 2.11.8. I am not a scala expert and just started my hands on Scala, so appreciate your inputs to get this resolved.
Here is the sample JSON format
{"RequestID":"9883a6d0-e002-4487-88a6-c92f6a504d72","OverallStatus":"OK","ele":[{"Name":"UUID","Value":"53f93df3-6528-4d42-a7f5-2876535d4982"},{"Name":"id"},{"Name":"opt_newsletter_email","Value":"boutmathieu@me.com"},{"Name":"parm1","Value":"secure.snnow.ca/orders/summary"},{"Name":"parm2","Value":"fromET"},{"Name":"parm3","Value":"implied"},{"Name":"parm4"},{"Name":"subscribed","Value":"True"},{"Name":"timestamp","Value":"8/6/2019 4:59:00 PM"},{"Name":"list_id","Value":"6"},{"Name":"name","Value":"Event Alerts"},{"Name":"email","Value":"boutmathieu@me.com"},{"Name":"newsletterID","Value":"sports:snnow:event"},{"Name":"subscribeFormIdOrURL"},{"Name":"unsubscribeTimestamp","Value":"8/14/2021 4:58:56 AM"}]}
{"RequestID":"9883a6d0-e002-4487-88a6-c92f6a504d72","OverallStatus":"OK","ele":[{"Name":"UUID","Value":"53f93df3-6528-4d42-a7f5-2876535d4982"},{"Name":"id"},{"Name":"opt_newsletter_email","Value":"boutmathieu@me.com"},{"Name":"parm1","Value":"secure.snnow.ca/orders/summary"},{"Name":"parm2","Value":"fromET"},{"Name":"parm3","Value":"implied"},{"Name":"parm4"},{"Name":"subscribed","Value":"True"},{"Name":"timestamp","Value":"8/6/2019 4:59:00 PM"},{"Name":"list_id","Value":"7"},{"Name":"name","Value":"Partner & Sponsored Offers"},{"Name":"email","Value":"boutmathieu@me.com"},{"Name":"newsletterID","Value":"sports:snnow:affiliate"},{"Name":"subscribeFormIdOrURL"},{"Name":"unsubscribeTimestamp","Value":"8/14/2021 4:58:56 AM"}]}
Expected output enter image description here
This is my code.
val newDF = spark.read.json("408d392-8c50-425a-a799-355f1783e0be-c000.json")
scala> newDF.printSchema
root
|-- OverallStatus: string (nullable = true)
|-- RequestID: string (nullable = true)
|-- ele: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Name: string (nullable = true)
| | |-- Value: string (nullable = true)
val jsonDF = newDF.withColumn("colNames", explode($"ele")).select($"RequestID", ($"ColNames"))
scala> jsonDF.printSchema
root
|-- RequestID: string (nullable = true)
|-- setting: struct (nullable = true)
| |-- Name: string (nullable = true)
| |-- Value: string (nullable = true)
val finalDF=jsonDF.groupBy($"RequestID").pivot("ColNames.name").agg("ColNames.value")
---------------------------------------------------------------------------------------
I am getting this error while creating the finalDF
<console>:39: error: overloaded method value agg with alternatives:
(expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame <and>
(exprs: java.util.Map[String,String])org.apache.spark.sql.DataFrame <and>
(exprs: scala.collection.immutable.Map[String,String])org.apache.spark.sql.DataFrame <and>
(aggExpr: (String, String),aggExprs: (String, String)*)org.apache.spark.sql.DataFrame
cannot be applied to (String)
val finalDF=jsonDF.groupBy($"RequestID").pivot("ColNames.name").agg("ColNames.value")
Any help would be greatly appreciated
解决方案
你快到了,agg
函数可以通过以下方式声明:
agg(aggExpr: (String, String)) -> agg("age" -> "max")
agg(exprs: Map[String, String]) -> agg(Map("age" -> "max")
agg(expr: Column) -> agg(max($"age"))
代替
agg("ColNames.value")
您应该使用上述示例之一。
例如
import org.apache.spark.sql.functions._
jsonDF.groupBy($"RequestID").pivot("ColNames.name")
.agg(collect_list($"ColNames.value"))
推荐阅读
- javascript - 如何传递方法来响应上下文 api 并在类组件中使用它们
- php - 从 php 文件问题流式传输视频
- react-native - 每次安装新模块后,aliyun-oss-react-native 的 builder.gradle 都会重置回原来的样子
- reactjs - 滚动时隐藏到达 UI 工具提示
- dita - DITA-OT 3.6.1:DOTJ083E 大写错误
- javascript - 视频中看到的灵活横幅
- .htaccess - 在 htaccess 中使用通配符进行 301 重定向不起作用
- python - 如何从列中提取字符串的某些部分以在 Pandas 中创建其他列
- tensorflow - 将连接层中的单个权重设置为不可训练
- numpy - 从字符串输入列表中构造一个 numpy 数组