首页 > 解决方案 > Spark:数据帧的复杂操作

问题描述

我有以下格式的输入数据集:

+---+--------+----------+
| id|   refId| timestamp|
+---+--------+----------+
|  1|    null|1548944642|
|  1|29950529|1548937685|
|  2|27510720|1548944885|
|  2|27510720|1548943617|
+---+--------+----------+

session需要使用以下转换逻辑添加新列:

  1. 如果refId is null,则会话值为真。
  2. 如果id and refId are unique,则会话值为真。
  3. 如果id and refId are not unique和`timestamp 大于上一行,则会话值为真。时间戳之间的差异也应大于 60。
+---+--------+-------+----------+
| id|   refId|session| timestamp|
+---+--------+-------+----------+
|  1|    null|   true|1548944642|
|  1|29950529|   true|1548937685|
|  2|27510720|  false|1548943617|
|  2|27510720|   true|1548944885|
+---+--------+-------+----------+

我可以分别做 1 和 3 个条件,但不能做第 2 个。

  1. `data.withColumn("session", functions.when(data.col("refId").isNull(), true));
  2. 3.
WindowSpec w = Window.partitionBy("id, refid").orderBy(timestampDS.col("timestamp"));
functions.coalesce(timestampDS.col("timestamp").cast("long").$minus(functions.lag("timestamp", 1).over(w).cast("long")), functions.lit(0));

我的问题是如何满足第二个条件并同时实现所有 3 个转换。

标签: javaapache-sparkapache-spark-sql

解决方案


您可以使用窗口函数来分组 id 和 rfId 并按时间戳排序,然后添加一个排名列。最后,添加带有 when 的 session 列,否则添加 sql 函数。

import org.apache.spark.sql.expressions.{Window}
import org.apache.spark.sql.functions.{when, col, rank, lit, lag}
val win = Window.partitionBy("id", "refId").orderBy("timestamp")
val result = df
      .withColumn("previous", lag("timestamp", 1) over win)
      .withColumn("rank", rank() over win)
      .withColumn("session",
        when(col("refId").isNull || col("rank") === lit(1), true)
          .otherwise(false)
      )
      .withColumn("diff", col("timestamp") - col("previous"))

推荐阅读