首页 > 解决方案 > 火花函数 rangeBetween 直到满足条件

问题描述

我有一个这种格式的数据集:

+-----------------------------+------------------------------------+------------+-----------------+
|     timestamp               |uuid                                |storyTopic  |Type             |
+-----------------------------+------------------------------------+------------+-----------------+
|2019-04-15 11:21:03.362 -0400|9e0f3d00-cff7-3b76-89df-0d11c0addc91|TRY1        |Draft            |
|2019-04-15 11:21:06.547 -0400|null                                |TRY1        |Draft            |
|2019-04-15 11:21:06.617 -0400|e142e9bc-6587-34e3-9042-959b624f5   |Trial12345  |Original         |
|2019-04-15 11:21:08.196 -0400|null                                |TRY1        |Draft            |
|2019-04-15 11:21:22.855 -0400|null                                |TRY1        |Draft            |
|2019-04-15 11:23:36.108 -0400|null                                |TRY1        |Draft            |
|2019-04-15 11:23:36.139 -0400|null                                |TRY1        |Draft            |
|2019-04-15 11:23:50.311 -0400|null                                |TRY1        |Draft            |
|2019-04-15 15:21:08.196 -0400|null                                |TRY1        |Draft            |
|2019-04-15 15:21:12.617 -0400|e142e9bc-6587-34e3-9042-959b624f5   |Trial12345  |Original         |
+-----------------------------+------------------------------------+------------+-----------------+

TRY1如果数据集第一行storyTopic为1.找到timeDifference小于5秒
行2.uuid不为null

我尝试通过以下方式实现它:

//create time difference column
df.withColumn("diff", functions.unix_timestamp(functions.col("timestamp")));
//create a window function for time-range
WindowSpec w = Window.orderBy("diff").rangeBetween(1L, 5L);
df.withColumn("Type", functions.when(
                (functions.rank().over(w)==1)
//checking if uuid is null
.and(functions.first("uuid").over(w).isNotNull()),
 functions.first("storyTopic").over(w)).otherwise("storyTopic");

发生的事情是它忽略了uuid.isNotNull()条件并从第二行获取数据,而不是从满足两个条件的第三行获取数据。

如何确保functions.first()获取 uuid 不为空的行?

标签: javaapache-sparkapache-spark-sql

解决方案


推荐阅读