首页 > 解决方案 > 加入两个时间序列数据框以获取 PySpark 中每个左条目的最新右条目

问题描述

我有两个 Sparks 数据框:

df1每个id和一个条目date

|date       |id   |
+-----------+-----+
|2021-11-15 |    1|
|2021-11-14 |    1|
|2021-11-15 |    2|
|2021-11-14 |    2|
|2021-11-15 |    3|
|2021-11-14 |    3|

df2有多个日志条目:

|date       |id   |
+-----------+-----+
|2021-11-13 |    1|
|2021-11-13 |    1|
|2021-11-13 |    3|
|2021-11-14 |    1|
|2021-11-14 |    1|
|2021-11-14 |    1|
|2021-11-14 |    1|
|2021-11-15 |    1|
|2021-11-15 |    1|

id我怎样才能加入这些 dfs ,以便我获得最新的可能条目(日期(df2)应该是 <= 日期datedf2

|date       |id    |   date(df2)| 
+-----------+------+------------+
|2021-11-15 |    1 | 2021-11-15 |
|2021-11-14 |    1 | 2021-11-14 |
|2021-11-15 |    2 |       null |
|2021-11-14 |    2 |       null |
|2021-11-15 |    3 | 2021-11-13 |
|2021-11-14 |    3 | 2021-11-13 |

THX 成数字

标签: apache-sparkpysparkapache-spark-sql

解决方案


使用 join 然后 group by df1.idanddf2.date并使用条件聚合来获取最大值df2.date <= df1.date

import pyspark.sql.functions as F


result_df = df1.join(
    df2.withColumnRenamed("date", "df2_date"),
    ["id"],
    "left"
).groupBy("id", "date").agg(
    F.max(
        F.when(F.col("df2_date") <= F.col("date"), F.col("df2_date"))
    ).alias("df2_date")
)

result_df.show()
#+---+----------+----------+
#| id|      date|  df2_date|
#+---+----------+----------+
#|  1|2021-11-14|2021-11-14|
#|  1|2021-11-15|2021-11-15|
#|  2|2021-11-14|      null|
#|  2|2021-11-15|      null|
#|  3|2021-11-14|2021-11-13|
#|  3|2021-11-15|2021-11-13|
#+---+----------+----------+

推荐阅读