首页 > 解决方案 > 如何对特定列进行分组,并在 Scala Spark 中将整行作为 JSON 字符串获取?

问题描述

我正在尝试以 JSON 格式收集数据集

val df = spark.sql("select invn_ctl_nbr,cl_id,department from pi_prd.table1 where batch_run_dt='20190101' and batchid = '20190101001' limit 10").toJSON.rdd

生成的结果格式为 Array[String]:

Array({"invn_ctl_nbr":"1","cl_id":"AK","department":"Dept1"}
{"invn_ctl_nbr":"2","cl_id":"AF","department":"Dept1"}
{"invn_ctl_nbr":"3","cl_id":"AF","department":"Dept2"}
{"invn_ctl_nbr":"4","cl_id":"AK","department":"Dept3"})

此外,我试图以这样的方式对数据进行分组,它们应该为我提供以下格式的结果:

Map<key, List<data>>

举个例子:

Map<AK=[{"invn_ctl_nbr":"1","cl_id":"AK","department":"Dept1"},{"invn_ctl_nbr":"4","cl_id":"AK","department":"Dept3"}],AF=[{"invn_ctl_nbr":"2","cl_id":"AF","department":"Dept1"},{"invn_ctl_nbr":"3","cl_id":"AF","department":"Dept2"}]>

我已经尝试过链接中提供的解决方案。我用来获取所需结果的代码是:

import org.json4s._
import org.json4s.jackson.Serialization.read

case class cC(invn_ctl_nbr: String,cl_id: String,department: String)

val rdd_new = df.map(m => {
implicit val formats = DefaultFormats
val parsedObj = read[cC](m)
(parsedObj.srk_clm_id, m)
})

rdd_new.collect.groupBy(_._1).map(m => (m._1,m._2.map(_._2).toList))

但它给了我以下错误:

org.json4s.package$MappingException: Parsed JSON values do not match with class constructor
args=
arg types=
constructor=public cC($iw,java.lang.String,java.lang.String,java.lang.String)

我的映射对于我在案例类中指定的内容以及来自 rdd 的内容是正确的,不确定我在这里究竟缺少什么。任何人都可以帮助解决这个问题吗?,将有很大的帮助。谢谢你。

标签: jsonscalaapache-spark

解决方案


您可以使用groupBywithstructto_jsoncollect_list获得您想要的结果

这里的数据框df等同于您的spark.sql("select query")

val df = Seq(
  ("1", "AK", "Dept1"),
  ("2", "AF", "Dept1"),
  ("3", "AF", "Dept2"),
  ("4", "AK", "Dept3")
).toDF("invn_ctl_nbr","cl_id","department")


val result = df.groupBy($"cl_id")
  .agg(to_json(collect_list(struct(df.columns.map(col(_)) : _*))))
  .rdd.map(x => (x.getString(0), x.get(1))).collectAsMap()

输出(结果):

Map(AF -> [{"invn_ctl_nbr":"2","cl_id":"AF","department":"Dept1"},{"invn_ctl_nbr":"3","cl_id":"AF","department":"Dept2"}], AK -> [{"invn_ctl_nbr":"1","cl_id":"AK","department":"Dept1"},{"invn_ctl_nbr":"4","cl_id":"AK","department":"Dept3"}])

希望这可以帮助!


推荐阅读