首页 > 解决方案 > 使用别名 Spark Scala 对同一数据帧中的不同列执行多个聚合

问题描述

这是基于来自以下链接的 Sumit 回答的问题

[ Spark SQL:将聚合函数应用于列列表

这是详细信息

val Claim1 = StructType(Seq(StructField("pid", StringType, true),StructField("diag1", StringType, 
true),StructField("diag2", StringType, true), StructField("allowed", IntegerType, true), 
StructField("allowed1", IntegerType, true)))

val claimsData1 = Seq(("PID1", "diag1", "diag2", 100, 200), ("PID1", "diag2", "diag3", 300, 600), 
("PID1", "diag1", "diag5", 340, 680), ("PID2", "diag3", "diag4", 245, 490), ("PID2", "diag2", 
"diag1", 124, 248))

val claimRDD1 = sc.parallelize(claimsData1)
val claimRDDRow1 = claimRDD1.map(p => Row(p._1, p._2, p._3, p._4, p._5))
val claimRDD2DF1 = sqlContext.createDataFrame(claimRDDRow1, Claim1)
val exprs = Map("allowed" -> "sum", "allowed1" -> "avg")
claimRDD2DF1.groupBy("pid").agg(exprs) show false

但它没有为命名新列提供别名,我有一个数据框,我需要对一组列执行多个聚合,它可以是多组列的 sum、avg、min、max,所以请告诉我如果有办法解决上述问题或有更好的方法来实现这一点?

提前致谢。

标签: scalaapache-sparkapache-spark-sql

解决方案


您的代码稍作修改即可工作,诀窍是调用callUDF将聚合函数作为 String 并且可以别名:

val exprs = Map("allowed" -> "sum", "allowed1" -> "avg")

val aggExpr = exprs.map{case (k,v)  => callUDF(v,col(k)).as(k)}.toList

claimRDD2DF1.groupBy("pid").agg(aggExpr.head,aggExpr.tail:_*)
  .show()

或者,如果您可以将聚合指定为函数对象,则无需使用callUDF

val aggExpr = Seq(
  ("allowed",sum(_:Column)),
  ("allowed1", avg(_:Column))
)
  .map{case (k,v)  => v(col(k)).as(k)}


claimRDD2DF1.groupBy("pid").agg(aggExpr.head,aggExpr.tail:_*)
  .show()

两个版本都给

+----+-------+-----------------+
| pid|allowed|         allowed1|
+----+-------+-----------------+
|PID1|    740|493.3333333333333|
|PID2|    369|            369.0|
+----+-------+-----------------+

推荐阅读