scala - 根据列上的条件过滤最新记录
问题描述
这是我的输入数据框
DataPartition TimeStamp OrganizationId SegmentId GeographicSegment_geographyId IsSubtracted Sequence FFAction|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 27 100002 false 1 O|!|
Japan 2018-05-29T07:52:45+00:00 4295876592 23 null null null D|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 28 100025 false 1 O|!|
Japan 2018-05-29T08:05:17+00:00 4295876592 14 null null null D|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 26 100105 false 1 O|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 6 100131 false 2 O|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 27 112018 false 2 O|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 11 null null null D|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 6 100023 false 1 O|!|
Japan 2018-05-29T08:05:17+00:00 4295876592 25 null null null D|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 29 100029 false 1 O|!|
Japan 2018-05-29T08:05:17+00:00 4295876592 24 null null null D|!|
Japan 2018-05-29T07:52:45+00:00 4295876592 22 null null null D|!|
Japan 2018-05-29T09:11:00+00:00 4295876592 27 100020 false 2 O|!|
Japan 2018-05-29T08:05:17+00:00 4295876592 7 100148 false 1 O|!|
Japan 2018-05-29T08:05:17+00:00 4295876592 21 null null null D|!|
逻辑是,对于相同的OrganizationId
列,SegmentId
我需要根据 TimeStamp 列的顺序获取最新记录,但有一个条件条件是如果相同OrganizationId
并且SegmentId
我们得到一个TimeStamp
,那么我需要得到它,但如果我得到多个时间戳行那么只有我需要得到最新的。例如,我们有 3 行SegmentId
27
Japan 2018-05-29T09:17:18+00:00 4295876592 27 100002 false 1 O|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 27 112018 false 2 O|!|
Japan 2018-05-29T09:11:00+00:00 4295876592 27 100020 false 2 O|!|
所以在上述情况下,我们有相同OrganizationId
的SegmentId
但有两个TimeStamp
,所以我需要得到最新的两个,预期的输出将是
Japan 2018-05-29T09:17:18+00:00 4295876592 27 100002 false 1 O|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 27 112018 false 2 O|!|
但在另一种情况下,我们有两条记录SegmentId
6
Japan 2018-05-29T09:17:18+00:00 4295876592 6 100131 false 2 O|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 6 100023 false 1 O|!|
在这种情况下OrganizationId
,也SegmentId
相同,但我们只有时间戳,所以我需要保留两列
最后这是我的输出数据框
DataPartition TimeStamp OrganizationId SegmentId GeographicSegment_geographyId IsSubtracted Sequence FFAction|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 27 100002 false 1 O|!|
Japan 2018-05-29T07:52:45+00:00 4295876592 23 null null null D|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 28 100025 false 1 O|!|
Japan 2018-05-29T08:05:17+00:00 4295876592 14 null null null D|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 26 100105 false 1 O|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 6 100131 false 2 O|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 27 112018 false 2 O|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 11 null null null D|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 6 100023 false 1 O|!|
Japan 2018-05-29T08:05:17+00:00 4295876592 25 null null null D|!|
Japan 2018-05-29T09:17:18+00:00 4295876592 29 100029 false 1 O|!|
Japan 2018-05-29T08:05:17+00:00 4295876592 24 null null null D|!|
Japan 2018-05-29T07:52:45+00:00 4295876592 22 null null null D|!|
Japan 2018-05-29T08:05:17+00:00 4295876592 7 100148 false 1 O|!|
Japan 2018-05-29T08:05:17+00:00 4295876592 21 null null null D|!|
这是我正在尝试使用的代码,但是当我使用它时,我会错过具有相同 SegmentId 且具有相同时间戳的记录
val windowSpec3 = Window.partitionBy("OrganizationId", "SegmentId", "TimeStamp").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey = latestForEachKey2.withColumn("rank", row_number().over(windowSpec3)).filter($"rank" === 1).drop("rank")
解决方案
从 partitionBy 中删除 Timestamp 并尝试以下操作:
val windowSpec3 = Window.partitionBy("OrganizationId", "SegmentId")
.orderBy(unix_timestamp($"TimeStamp", "yyyy-MM- dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey = df.withColumn("rank",
dense_rank().over(windowSpec3)).filter($"rank" === 1).drop("rank")
推荐阅读
- python - Python sleep 在 VS Code 的 Conda 环境中不起作用
- node.js - SAP HANA 与 Ionic 应用程序的连接(浏览器 /node_modules/java/lib/nodeJavaBridge.js 中的错误)
- spring-boot - 在 Websphere 8.5.5 中部署 Spring Boot 应用程序时出错
- c++ - 指向(数据)成员的指针作为非类型模板参数,例如具有自动存储持续时间/无链接
- python-3.x - 为什么我们使用 hadoop mapreduce 进行数据处理?为什么不在本地机器上做呢?
- javascript - 如何更改电子应用程序中的加载屏幕?
- string - 审查 lua 中的单词
- json.net - newtonsoft json.net 禁用科学记数法
- regex - 正则表达式 - 如果长度超过最大允许长度,则从字符串中检索最后 n 个字符
- python - 已解决:为什么导入 Python 库会引发语法错误?