首页 > 解决方案 > Spark:在每个组中过滤

问题描述

我有一个像

+------+-------------------+------+
|group |               time| label|
+------+-------------------+------+
|     a|2020-01-01 10:49:00|first |
|     a|2020-01-01 10:51:00|second|
|     a|2020-01-01 12:49:00|first |
|     b|2020-01-01 12:44:00|second|
|     b|2020-01-01 12:46:00|first |
|     c|2020-01-01 12:46:00|third |
+------+-------------------+------+

我想删除所有行,对于每个组,标签first比 labelsecondthird. 例如,在 group中,应该删除a带有firstand的行,因为有一个带有标签的旧行。2020-01-01 12:49:00second

所需的输出将是:

+------+-------------------+------+
|group |               time| label|
+------+-------------------+------+
|     a|2020-01-01 10:49:00|first |
|     a|2020-01-01 10:51:00|second|
|     b|2020-01-01 12:44:00|second|
|     c|2020-01-01 12:46:00|third |
+------+-------------------+------+

具有分区依据的窗口函数group将在每个组内过滤,但是如何在标签上实现过滤器?

标签: apache-sparkapache-spark-sqlwindow-functions

解决方案


您可以使用不是“第一”的标签获取上一次,并使用该列进行过滤:

import org.apache.spark.sql.expressions.Window

val df2 = df.withColumn(
    "non_first_time", 
    last(
        when(col("label") =!= "first", col("time")), 
        true
    ).over(
        Window.partitionBy("group").orderBy("time")
    )
).filter("""
    label != 'first' or 
    (label = 'first' and (non_first_time > time or non_first_time is null))
""").drop("non_first_time")

df2.show
+-----+-------------------+------+
|group|               time| label|
+-----+-------------------+------+
|    c|2020-01-01 12:46:00| third|
|    b|2020-01-01 12:44:00|second|
|    a|2020-01-01 10:49:00| first|
|    a|2020-01-01 10:51:00|second|
+-----+-------------------+------+

推荐阅读