首页 > 解决方案 > 将列添加到 Spark 数据框中,其最大值小于当前记录的值

问题描述

我有一个类似于以下内容的 Spark 数据框:

id  claim_id                 service_date                  status   product
123 10606134411906233408    2018-09-17T00:00:00.000+0000    PD      blue
123 10606147900401009928    2019-01-24T00:00:00.000+0000    PD      yellow
123 10606160940704723994    2019-05-23T00:00:00.000+0000    RV      yellow
123 10606171648203079553    2019-08-29T00:00:00.000+0000    RJ      blue
123 10606186611407311724    2020-01-13T00:00:00.000+0000    PD      blue

请原谅我没有粘贴任何代码,因为没有任何效果。我想添加一个新列,其中状态为 PD 的前一行的 max(service_date) 并且当前行的乘积 = 上一行的乘积。

这可以通过相关子查询轻松完成,但效率不高,此外,在 Spark 中也不可行,因为不支持非 equi 连接。另请注意,LAG 将不起作用,因为我并不总是需要前一个记录(并且偏移量将是动态的)。

预期的输出将是这样的:

id  claim_id                 service_date                  status   product     previous_service_date
    123 10606134411906233408    2018-09-17T00:00:00.000+0000    PD      blue
    123 10606147900401009928    2019-01-24T00:00:00.000+0000    PD      yellow
    123 10606160940704723994    2019-05-23T00:00:00.000+0000    RV      yellow      2019-01-24T00:00:00.000+0000
    123 10606171648203079553    2019-08-29T00:00:00.000+0000    RJ      blue        2018-09-17T00:00:00.000+0000
    123 10606186611407311724    2020-01-13T00:00:00.000+0000    PD      blue        2018-09-17T00:00:00.000+0000

标签: pythonapache-sparkpysparkdatabricks

解决方案


您可以copy将 DataFrame 转换为新的 DataFrame ( df2),join如下所示:

(df.join(df2, 
         on = [df.Service_date > df2.Service_date,
               df.product == df2.product,
               df2.status == 'PD'],
         how = "left"))

删除重复的列并将其重命名df2.Service_dateprevious_service_date


推荐阅读