首页 > 解决方案 > 在 Spark 中与 groupBy 一起使用领先和滞后函数的可能性

问题描述

我很有趣有没有办法使用领先\滞后来计算这样的东西

第一步:我有一个数据框

+----+-----------+------+
| id | timestamp | sess |
+----+-----------+------+
| xx | 1         | A    |
+----+-----------+------+
| yy | 2         | A    |
+----+-----------+------+
| zz | 1         | B    |
+----+-----------+------+
| yy | 3         | B    |
+----+-----------+------+
| tt | 4         | B    |
+----+-----------+------+

我想通过 session_id 收集特定 id 分区之前的 id

+----+---------+
| id | id_list |
+----+---------+
| yy | [xx,zz] |
+----+---------+
| xx | []      |
+----+---------+
| zz | []      |
+----+---------+
| tt | [yy]    |
+----+---------+

标签: apache-spark

解决方案


您可以在问题中提到window的列sess和ID 上创建一个。lag然后您可以使用groupBy聚合函数collect_list来获取输出。

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"sess").orderBy($"timestamp")
val df1 = df.withColumn("lagged", lag($"id", 1).over(w))
 df1.select("id", "lagged").groupBy($"id").agg(collect_list($"lagged").as("id_list")).show

//+---+--------------------+
//| id|             id_list|
//+---+--------------------+
//| tt|                [yy]|
//| xx|                  []|
//| zz|                  []|
//| yy|            [zz, xx]|
//+---+--------------------+

推荐阅读