json - 如何对特定列进行分组,并在 Scala Spark 中将整行作为 JSON 字符串获取?
问题描述
我正在尝试以 JSON 格式收集数据集
val df = spark.sql("select invn_ctl_nbr,cl_id,department from pi_prd.table1 where batch_run_dt='20190101' and batchid = '20190101001' limit 10").toJSON.rdd
生成的结果格式为 Array[String]:
Array({"invn_ctl_nbr":"1","cl_id":"AK","department":"Dept1"}
{"invn_ctl_nbr":"2","cl_id":"AF","department":"Dept1"}
{"invn_ctl_nbr":"3","cl_id":"AF","department":"Dept2"}
{"invn_ctl_nbr":"4","cl_id":"AK","department":"Dept3"})
此外,我试图以这样的方式对数据进行分组,它们应该为我提供以下格式的结果:
Map<key, List<data>>
举个例子:
Map<AK=[{"invn_ctl_nbr":"1","cl_id":"AK","department":"Dept1"},{"invn_ctl_nbr":"4","cl_id":"AK","department":"Dept3"}],AF=[{"invn_ctl_nbr":"2","cl_id":"AF","department":"Dept1"},{"invn_ctl_nbr":"3","cl_id":"AF","department":"Dept2"}]>
我已经尝试过链接中提供的解决方案。我用来获取所需结果的代码是:
import org.json4s._
import org.json4s.jackson.Serialization.read
case class cC(invn_ctl_nbr: String,cl_id: String,department: String)
val rdd_new = df.map(m => {
implicit val formats = DefaultFormats
val parsedObj = read[cC](m)
(parsedObj.srk_clm_id, m)
})
rdd_new.collect.groupBy(_._1).map(m => (m._1,m._2.map(_._2).toList))
但它给了我以下错误:
org.json4s.package$MappingException: Parsed JSON values do not match with class constructor
args=
arg types=
constructor=public cC($iw,java.lang.String,java.lang.String,java.lang.String)
我的映射对于我在案例类中指定的内容以及来自 rdd 的内容是正确的,不确定我在这里究竟缺少什么。任何人都可以帮助解决这个问题吗?,将有很大的帮助。谢谢你。
解决方案
您可以使用groupBy
withstruct
和to_json
来collect_list
获得您想要的结果
这里的数据框df
等同于您的spark.sql("select query")
val df = Seq(
("1", "AK", "Dept1"),
("2", "AF", "Dept1"),
("3", "AF", "Dept2"),
("4", "AK", "Dept3")
).toDF("invn_ctl_nbr","cl_id","department")
val result = df.groupBy($"cl_id")
.agg(to_json(collect_list(struct(df.columns.map(col(_)) : _*))))
.rdd.map(x => (x.getString(0), x.get(1))).collectAsMap()
输出(结果):
Map(AF -> [{"invn_ctl_nbr":"2","cl_id":"AF","department":"Dept1"},{"invn_ctl_nbr":"3","cl_id":"AF","department":"Dept2"}], AK -> [{"invn_ctl_nbr":"1","cl_id":"AK","department":"Dept1"},{"invn_ctl_nbr":"4","cl_id":"AK","department":"Dept3"}])
希望这可以帮助!
推荐阅读
- php - 如何通过 html、json 和 php 中的输入帖子添加到列表?(不使用数据库)?
- winapi - 监控光驱(WIN32)的读/写活动?
- html - 即使在两个“align-items:center; justify-content:center”之后,搜索栏内容也没有固定在中心
- android - SoundPool 与 ImageView 的用法类似 Switch
- java - 获取主要方法问题但主要方法存在吗?
- segmentation-fault - 通过引用传递数组时 realloc 不起作用
- c# - 为 .NET 5 Core Controller 单元测试模拟或实际创建 ODataQueryOptions
- c++ - 如果存在逗号,如何忽略浮点数?
- angular - Angular:/deep/ 不适用于角度升级
- javascript - React 自定义 useRequest Hook 导致 MemoryLeak