java - Spark:数据帧的复杂操作
问题描述
我有以下格式的输入数据集:
+---+--------+----------+
| id| refId| timestamp|
+---+--------+----------+
| 1| null|1548944642|
| 1|29950529|1548937685|
| 2|27510720|1548944885|
| 2|27510720|1548943617|
+---+--------+----------+
session
需要使用以下转换逻辑添加新列:
- 如果
refId is null
,则会话值为真。 - 如果
id and refId are unique
,则会话值为真。 - 如果
id and refId are not unique
和`timestamp 大于上一行,则会话值为真。时间戳之间的差异也应大于 60。
+---+--------+-------+----------+
| id| refId|session| timestamp|
+---+--------+-------+----------+
| 1| null| true|1548944642|
| 1|29950529| true|1548937685|
| 2|27510720| false|1548943617|
| 2|27510720| true|1548944885|
+---+--------+-------+----------+
我可以分别做 1 和 3 个条件,但不能做第 2 个。
- `data.withColumn("session", functions.when(data.col("refId").isNull(), true)); 3.
WindowSpec w = Window.partitionBy("id, refid").orderBy(timestampDS.col("timestamp"));
functions.coalesce(timestampDS.col("timestamp").cast("long").$minus(functions.lag("timestamp", 1).over(w).cast("long")), functions.lit(0));
我的问题是如何满足第二个条件并同时实现所有 3 个转换。
解决方案
您可以使用窗口函数来分组 id 和 rfId 并按时间戳排序,然后添加一个排名列。最后,添加带有 when 的 session 列,否则添加 sql 函数。
import org.apache.spark.sql.expressions.{Window}
import org.apache.spark.sql.functions.{when, col, rank, lit, lag}
val win = Window.partitionBy("id", "refId").orderBy("timestamp")
val result = df
.withColumn("previous", lag("timestamp", 1) over win)
.withColumn("rank", rank() over win)
.withColumn("session",
when(col("refId").isNull || col("rank") === lit(1), true)
.otherwise(false)
)
.withColumn("diff", col("timestamp") - col("previous"))
推荐阅读
- angular - Mousevent方法不触发
- perl - 执行 shell 脚本 Sed 和 Perl:使用 & 作为匹配字符串
- python - 哪个是真正的 python3 可执行文件的路径?
- c# - c#中的“operator true”是否正好有两个地方可以使用?
- spring - 使用 Spring/Java 解析管道分隔文件并将数据存储到数据库中
- javascript - 用JS改变元素的字体大小
- php - 使用php获取alexa排名
- ibm-cloud - cf UI 控制台的 API 端点 https://console.w3ibm.bluemix.net/
- jenkins - 从 Jenkins build 为 pull request 启用 Github 状态检查:抱歉,我们在上周找不到此存储库的任何状态检查
- vba - 子/函数调用中的括号和逗号分隔的参数?