dataframe - 在apache spark SQL中,如何在窗口函数中使用collect_list时删除重复的行?
问题描述
我有以下数据框,
+----+-----+----+--------+
|year|month|item|quantity|
+----+-----+----+--------+
|2019|1 |TV |8 |
|2019|2 |AC |10 |
|2018|1 |TV |2 |
|2018|2 |AC |3 |
+----+-----+----+--------+
通过使用窗口函数,我想得到低于输出,
val partitionWindow = Window.partitionBy("year").orderBy("month")
val itemsList= collect_list(struct("item", "quantity")).over(partitionWindow)
df.select("year", itemsList as "items")
Expected output:
+----+-------------------+
|year|items |
+----+-------------------+
|2019|[[TV, 8], [AC, 10]]|
|2018|[[TV, 2], [AC, 3]] |
+----+-------------------+
但是,当我使用窗口函数时,每个项目都有重复的行,
Current output:
+----+-------------------+
|year|items |
+----+-------------------+
|2019|[[TV, 8]] |
|2019|[[TV, 8], [AC, 10]]|
|2018|[[TV, 2]] |
|2018|[[TV, 2], [AC, 3]] |
+----+-------------------+
我想知道删除重复行的最佳方法是什么?
解决方案
我相信这里有趣的部分是项目的聚合列表是按月排序的。所以我用三种方法编写了代码:
创建示例数据集:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
case class data(year : Int, month : Int, item : String, quantity : Int)
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val inputDF = spark.createDataset(Seq(
data(2018, 2, "AC", 3),
data(2019, 2, "AC", 10),
data(2019, 1, "TV", 2),
data(2018, 1, "TV", 2)
)).toDF()
方法1:将月份、项目和数量聚合到列表中,然后使用UDF按月份对项目进行排序:
case class items(item : String, quantity : Int)
def getItemsSortedByMonth(itemsRows : Seq[Row]) : Seq[items] = {
if (itemsRows == null || itemsRows.isEmpty) {
null
}
else {
itemsRows.sortBy(r => r.getAs[Int]("month"))
.map(r => items(r.getAs[String]("item"), r.getAs[Int]("quantity")))
}
}
val itemsSortedByMonthUDF = udf(getItemsSortedByMonth(_: Seq[Row]))
val outputDF = inputDF.groupBy(col("year"))
.agg(collect_list(struct("month", "item", "quantity")).as("items"))
.withColumn("items", itemsSortedByMonthUDF(col("items")))
方法2:使用窗口函数
val monthWindowSpec = Window.partitionBy("year").orderBy("month")
val rowNumberWindowSpec = Window.partitionBy("year").orderBy("row_number")
val runningList = collect_list(struct("item", "quantity")). over(rowNumberWindowSpec)
val tempDF = inputDF
// using row_number for continuous ranks if there are multiple items in the same month
.withColumn("row_number", row_number().over(monthWindowSpec))
.withColumn("items", runningList)
.drop("month", "item", "quantity")
tempDF.persist()
val yearToSelect = tempDF.groupBy("year").agg(max("row_number").as("row_number"))
val outputDF = tempDF.join(yearToSelect, Seq("year", "row_number")).drop("row_number")
编辑:使用数据集 API 为后代添加了第三种方法 - groupByKey 和 mapGroups:
//encoding to data class can be avoided if inputDF is not converted dataset of row objects
val outputDF = inputDF.as[data].groupByKey(_.year).mapGroups{ case (year, rows) =>
val itemsSortedByMonth = rows.toSeq.sortBy(_.month).map(s => items(s.item, s.quantity))
(year, itemsSortedByMonth)
}.toDF("year", "items")
推荐阅读
- r - upload_file(x) 中的错误:is.character(path) 在循环中不是 TRUE
- android - 我怎样才能收到我检查的数据
- postgresql - 使用 VARIADIC 数组作为参数的 PL/Python3
- python - 逻辑回归是创建评分模型的更好方法吗?
- tensorflow - AttributeError:“KerasTPUModel”对象没有属性“_run_eagerly”
- domain-driven-design - 如何在 DDD 聚合建模中表示枚举
- html - 将 html 表格单元格数值格式化为 Excel 的文本
- instagram - 是否有用于查找 Instagram 个人资料类型(商业/个人)的 Instagram API/Graph API?
- c++ - Why is calling a class method in the constructor causing the program to hang?
- python - 选择在我的父表中创建的特定数据