首页 > 解决方案 > PySpark Dataframe 中从一列到另一列的最近日期

问题描述

我有一个 pyspark 数据框,其中提到了商品的价格,但是没有关于商品何时购买的数据,我只有 1 年的范围。

+---------+------------+----------------+----------------+
|Commodity| BuyingPrice|Date_Upper_limit|Date_lower_limit|
+---------+------------+----------------+----------------+
|    Apple|           5|      2020-07-04|      2019-07-03|
|   Banana|           3|      2020-07-03|      2019-07-02|
|   Banana|           4|      2019-10-02|      2018-10-01|
|    Apple|           6|      2020-01-20|      2019-01-19|
|   Banana|         3.5|      2019-08-17|      2018-08-16|
+---------+------------+----------------+----------------+

我有另一个 pyspark 数据框,可以在其中查看所有商品的市场价格和日期。

+----------+----------+------------+
|      Date| Commodity|Market Price|
+----------+----------+------------+
|2020-07-01|     Apple|           3|
|2020-07-01|    Banana|           3|
|2020-07-02|     Apple|           4|
|2020-07-02|    Banana|         2.5|
|2020-07-03|     Apple|           7|
|2020-07-03|    Banana|           4|
+----------+----------+------------+

当该商品的市场价格(MP)<或=购买价格(BP)时,我想查看最接近日期上限的日期。

预期输出(用于 2 个顶部列):

+---------+------------+----------------+----------------+--------------------------------+
|Commodity| BuyingPrice|Date_Upper_limit|Date_lower_limit|Closest Date to UL when MP <= BP|
+---------+------------+----------------+----------------+--------------------------------+
|    Apple|           5|      2020-07-04|      2019-07-03|                      2020-07-02|
|   Banana|           3|      2020-07-03|      2019-07-02|                      2020-07-02|
+---------+------------+----------------+----------------+--------------------------------+

尽管 Apple 在 2020 年 7 月 1 日(3 美元)的价格要低得多,但由于 2020 年 7 月 2 日是 MP <= BP 时从日期上限 (UL) 倒退的第一个日期。所以,我选择了 2020-07-02。

我怎样才能向后看以填写可能购买的日期?

标签: dateapache-sparkpysparkwindow-functionspyspark-dataframes

解决方案


试试这个conditional joinwindow function

from pyspark.sql import functions as F
from pyspark.sql.window import Window  

w=Window().partitionBy("Commodity")

df1\  #first dataframe shown being df1 and second being df2
   .join(df2.withColumnRenamed("Commodity","Commodity1")\
         , F.expr("""`Market Price`<=BuyingPrice and Date<Date_Upper_limit and Commodity==Commodity1"""))\
   .drop("Market Price","Commodity1")\
   .withColumn("max", F.max("Date").over(w))\
   .filter('max==Date').drop("max").withColumnRenamed("Date","Closest Date to UL when MP <= BP")\
   .show()

#+---------+-----------+----------------+----------------+--------------------------------+
#|Commodity|BuyingPrice|Date_Upper_limit|Date_lower_limit|Closest Date to UL when MP <= BP|
#+---------+-----------+----------------+----------------+--------------------------------+
#|   Banana|        3.0|      2020-07-03|      2019-07-02|                      2020-07-02|
#|    Apple|        5.0|      2020-07-04|      2019-07-03|                      2020-07-02|
#+---------+-----------+----------------+----------------+--------------------------------+

推荐阅读