apache-spark - 在pyspark中按日期过滤最近的行
问题描述
我想在我的 pyspark 数据框中按日期过滤。
我有一个这样的数据框:
+------+---------+------+-------------------+-------------------+----------+
|amount|cost_type|place2| min_ts| max_ts| ds|
+------+---------+------+-------------------+-------------------+----------+
|100000| reorder| 1.0|2020-10-16 10:16:31|2020-11-21 18:50:27|2021-05-29|
|250000|overusage| 1.0|2020-11-21 18:48:02|2021-02-09 20:07:28|2021-05-29|
|100000|overusage| 1.0|2019-05-12 16:00:40|2020-11-21 18:44:04|2021-05-29|
|200000| reorder| 1.0|2020-11-21 19:00:09|2021-05-29 23:56:25|2021-05-29|
+------+---------+------+-------------------+-------------------+----------+
我想为每个可能的 <code>cost_type 过滤一行,ds
例如,ds = '2021-05-29'
我的过滤器应该选择第二行和第四行。但是对于ds = '2020-05-01'
应该选择我的数据框的第一行和第三行。如果我的 ds 在 <code>min_ts 范围内,max_ts
我的过滤器应该为每种成本类型选择该行。
解决方案
一种可能的方法是根据某些条件分配行号:
- 是否
ds
介于min_ts
和之间max_ts
。 ds
如果不是,则和之间min_ts
或ds
和之间的绝对日期差中的较小者max_ts
。
from pyspark.sql import functions as F, Window
w = Window.partitionBy('cost_type').orderBy(
F.col('ds').cast('timestamp').between(F.col('min_ts'), F.col('max_ts')).desc(),
F.least(F.abs(F.datediff('ds', 'max_ts')), F.abs(F.datediff('ds', 'min_ts')))
)
df2 = df.withColumn('rn', F.row_number().over(w)).filter('rn = 1').drop('rn')
推荐阅读
- javascript - Keycloak javascript适配器未授权iframe
- javascript - Vue Javascript forEach循环内部方法
- javascript - 在 odoo 14 中实现基本的工作 owl.carousel.js
- java - 虽然循环不起作用错误:二元运算符'!='Java的错误操作数类型
- google-chrome - 使用批处理语言 CMD 循环检查 chrome 是否仍在工作
- javascript - 如何减慢单个请求/ API 端点的开发速度?
- c# - 使用自动映射器按惯例展开
- xml - 为什么在 Flutter 中为 xml 解析返回 null 模型类列表?
- json - JOLT 将日期和时间转换为时间戳
- c# - 从 C# 中的 .txt 文件中读取和删除随机行