首页 > 解决方案 > 根据列上的条件过滤最新记录

问题描述

这是我的输入数据框

DataPartition   TimeStamp   OrganizationId  SegmentId   GeographicSegment_geographyId   IsSubtracted    Sequence    FFAction|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  27  100002  false   1   O|!|
Japan   2018-05-29T07:52:45+00:00   4295876592  23  null    null    null    D|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  28  100025  false   1   O|!|
Japan   2018-05-29T08:05:17+00:00   4295876592  14  null    null    null    D|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  26  100105  false   1   O|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  6   100131  false   2   O|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  27  112018  false   2   O|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  11  null    null    null    D|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  6   100023  false   1   O|!|
Japan   2018-05-29T08:05:17+00:00   4295876592  25  null    null    null    D|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  29  100029  false   1   O|!|
Japan   2018-05-29T08:05:17+00:00   4295876592  24  null    null    null    D|!|
Japan   2018-05-29T07:52:45+00:00   4295876592  22  null    null    null    D|!|
Japan   2018-05-29T09:11:00+00:00   4295876592  27  100020  false   2   O|!|
Japan   2018-05-29T08:05:17+00:00   4295876592  7   100148  false   1   O|!|
Japan   2018-05-29T08:05:17+00:00   4295876592  21  null    null    null    D|!|

逻辑是,对于相同的OrganizationId列,SegmentId我需要根据 TimeStamp 列的顺序获取最新记录,但有一个条件条件是如果相同OrganizationId并且SegmentId我们得到一个TimeStamp,那么我需要得到它,但如果我得到多个时间戳行那么只有我需要得到最新的。例如,我们有 3 行SegmentId27

Japan   2018-05-29T09:17:18+00:00   4295876592  27  100002  false   1   O|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  27  112018  false   2   O|!|
Japan   2018-05-29T09:11:00+00:00   4295876592  27  100020  false   2   O|!|

所以在上述情况下,我们有相同OrganizationIdSegmentId但有两个TimeStamp,所以我需要得到最新的两个,预期的输出将是

Japan   2018-05-29T09:17:18+00:00   4295876592  27  100002  false   1   O|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  27  112018  false   2   O|!|

但在另一种情况下,我们有两条记录SegmentId6

Japan   2018-05-29T09:17:18+00:00   4295876592  6   100131  false   2   O|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  6   100023  false   1   O|!|

在这种情况下OrganizationId,也SegmentId相同,但我们只有时间戳,所以我需要保留两列

最后这是我的输出数据框

DataPartition   TimeStamp   OrganizationId  SegmentId   GeographicSegment_geographyId   IsSubtracted    Sequence    FFAction|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  27  100002  false   1   O|!|
Japan   2018-05-29T07:52:45+00:00   4295876592  23  null    null    null    D|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  28  100025  false   1   O|!|
Japan   2018-05-29T08:05:17+00:00   4295876592  14  null    null    null    D|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  26  100105  false   1   O|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  6   100131  false   2   O|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  27  112018  false   2   O|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  11  null    null    null    D|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  6   100023  false   1   O|!|
Japan   2018-05-29T08:05:17+00:00   4295876592  25  null    null    null    D|!|
Japan   2018-05-29T09:17:18+00:00   4295876592  29  100029  false   1   O|!|
Japan   2018-05-29T08:05:17+00:00   4295876592  24  null    null    null    D|!|
Japan   2018-05-29T07:52:45+00:00   4295876592  22  null    null    null    D|!|
Japan   2018-05-29T08:05:17+00:00   4295876592  7   100148  false   1   O|!|
Japan   2018-05-29T08:05:17+00:00   4295876592  21  null    null    null    D|!|

这是我正在尝试使用的代码,但是当我使用它时,我会错过具有相同 SegmentId 且具有相同时间戳的记录

val windowSpec3 = Window.partitionBy("OrganizationId", "SegmentId", "TimeStamp").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
    val latestForEachKey = latestForEachKey2.withColumn("rank", row_number().over(windowSpec3)).filter($"rank" === 1).drop("rank")

标签: scalaapache-sparkapache-spark-sql

解决方案


从 partitionBy 中删除 Timestamp 并尝试以下操作:

val windowSpec3 = Window.partitionBy("OrganizationId", "SegmentId")
.orderBy(unix_timestamp($"TimeStamp", "yyyy-MM- dd'T'HH:mm:ss").cast("timestamp").desc)

val latestForEachKey = df.withColumn("rank", 
dense_rank().over(windowSpec3)).filter($"rank" === 1).drop("rank")

推荐阅读