首页 > 解决方案 > 按条件分组的 ROW_NUMBER()

问题描述

假设我们有一个带有“事件”列的 DataFrame:

events
A
b
c
d
e
A
b
c
d
A
f

我想通过这样的拆分获得 WindowGroups 或只是带有 row_number() 的新列

events
A
b
c
d
e
f
g
----- split here ---
A
b
c
d
----- split here ---
A
f

所以我想将“事件”列中“A”之间的所有行放到一组中。怎么做?我觉得可以用 Window 函数来完成。

标签: sqlscalaapache-sparkwindow-functions

解决方案


最后,我自己找到了解决方案。这里是:

import org.apache.spark.sql.expressions.Window
 val windowIndex = Window.partitionBy().orderBy("time")

val result = eventWithTime
  .withColumn("groupId",
    when($"events" === "A", row_number over windowIndex).otherwise(null))
  .withColumn("groupId", last("groupId", ignoreNulls = true) over windowIndex)
  .filter($"groupId".isNotNull)

(我使用列“时间”只是为了对示例中的事件进行排序)

这里的想法是找到所有带有“A”的“事件”并用唯一的 id 标记它们。我是使用row_numberWindow.partitionBy()功能做到的。(也许使用起来会更好,monotonically_increasing_id但我有很多数据,并且有一些正确工作的假设monotonically_increasing_id)。之后,我使用last了具有相同窗口的功能。这里重要的是将 ignoreNulls 设置为“true”。这样,所有空值都将在当前行之前用第一个非空值填充。然后我只是在第一个“A”之前删除第一行,因为它们仍然是空值。

前任。:

  1. 在任何操作之前
events
A
b
c
d
e
A
b
c
d
A
f
  1. 为所有“A”分配唯一的 id(否则为空)
events | groupId
A      | 1
b      | null
c      | null
d      | null
e      | null
A      | 2
b      | null
c      | null
d      | null
A      | 3
f      | null
  1. 使用最后一个非空值填充空值
events | groupId
A      | 1
b      | 1
c      | 1
d      | 1
e      | 1
A      | 2
b      | 2
c      | 2
d      | 2
A      | 3
f      | 3

现在我们可以直接groupBypartitionBy通过 groupId 做我们想做的事。


推荐阅读