apache-spark - 加入两个时间序列数据框以获取 PySpark 中每个左条目的最新右条目
问题描述
我有两个 Sparks 数据框:
df1
每个id
和一个条目date
:
|date |id |
+-----------+-----+
|2021-11-15 | 1|
|2021-11-14 | 1|
|2021-11-15 | 2|
|2021-11-14 | 2|
|2021-11-15 | 3|
|2021-11-14 | 3|
df2
有多个日志条目:
|date |id |
+-----------+-----+
|2021-11-13 | 1|
|2021-11-13 | 1|
|2021-11-13 | 3|
|2021-11-14 | 1|
|2021-11-14 | 1|
|2021-11-14 | 1|
|2021-11-14 | 1|
|2021-11-15 | 1|
|2021-11-15 | 1|
id
我怎样才能加入这些 dfs ,以便我获得最新的可能条目(日期(df2)应该是 <= 日期date
)df2
?
|date |id | date(df2)|
+-----------+------+------------+
|2021-11-15 | 1 | 2021-11-15 |
|2021-11-14 | 1 | 2021-11-14 |
|2021-11-15 | 2 | null |
|2021-11-14 | 2 | null |
|2021-11-15 | 3 | 2021-11-13 |
|2021-11-14 | 3 | 2021-11-13 |
THX 成数字
解决方案
使用 join 然后 group by df1.id
anddf2.date
并使用条件聚合来获取最大值df2.date <= df1.date
import pyspark.sql.functions as F
result_df = df1.join(
df2.withColumnRenamed("date", "df2_date"),
["id"],
"left"
).groupBy("id", "date").agg(
F.max(
F.when(F.col("df2_date") <= F.col("date"), F.col("df2_date"))
).alias("df2_date")
)
result_df.show()
#+---+----------+----------+
#| id| date| df2_date|
#+---+----------+----------+
#| 1|2021-11-14|2021-11-14|
#| 1|2021-11-15|2021-11-15|
#| 2|2021-11-14| null|
#| 2|2021-11-15| null|
#| 3|2021-11-14|2021-11-13|
#| 3|2021-11-15|2021-11-13|
#+---+----------+----------+
推荐阅读
- python - 我的机器人无法正确运行 on_message/on_ready 事件
- python - 如何计算每组的分数并推导出每组的平均分,
- php - 从php将json对象插入mongodb数组字段
- celery - 芹菜工人不读取热解图会话文件
- ubuntu - 无法修复 linux mint 上损坏的应用程序
- android - 如何修复 android studio 首次运行问题?
- javascript - 使用 Javascript 的屏幕截图截图工具
- python - 使用类名而不是类型的 Python“isinstance”
- mapbox - MapBox NumberFormat 预期语法
- c++ - boost::asio::ip::tcp::acceptor 在使用 async_accept 接收连接请求时终止应用程序