首页 > 解决方案 > 在pyspark中按日期过滤最近的行

问题描述

我想在我的 pyspark 数据框中按日期过滤。

我有一个这样的数据框:

+------+---------+------+-------------------+-------------------+----------+
|amount|cost_type|place2|             min_ts|             max_ts|        ds|
+------+---------+------+-------------------+-------------------+----------+
|100000|  reorder|   1.0|2020-10-16 10:16:31|2020-11-21 18:50:27|2021-05-29|
|250000|overusage|   1.0|2020-11-21 18:48:02|2021-02-09 20:07:28|2021-05-29|
|100000|overusage|   1.0|2019-05-12 16:00:40|2020-11-21 18:44:04|2021-05-29|
|200000|  reorder|   1.0|2020-11-21 19:00:09|2021-05-29 23:56:25|2021-05-29|
+------+---------+------+-------------------+-------------------+----------+

我想为每个可能的 ‍‍<code>cost_type 过滤一行,ds 例如,ds = '2021-05-29'我的过滤器应该选择第二行和第四行。但是对于ds = '2020-05-01'应该选择我的数据框的第一行和第三行。如果我的 ds 在 ‍‍‍<code>min_ts 范围内,max_ts我的过滤器应该为每种成本类型选择该行。

标签: apache-sparkpysparkapache-spark-sql

解决方案


一种可能的方法是根据某些条件分配行号:

  1. 是否ds介于min_ts和之间max_ts
  2. ds如果不是,则和之间min_tsds和之间的绝对日期差中的较小者max_ts
from pyspark.sql import functions as F, Window

w = Window.partitionBy('cost_type').orderBy(
    F.col('ds').cast('timestamp').between(F.col('min_ts'), F.col('max_ts')).desc(), 
    F.least(F.abs(F.datediff('ds', 'max_ts')), F.abs(F.datediff('ds', 'min_ts')))
)

df2 = df.withColumn('rn', F.row_number().over(w)).filter('rn = 1').drop('rn')

推荐阅读