python - Python Spark - 阈值后删除数据 - Pyspark
问题描述
如何在最后一次TP == 1
使用 48 小时的缓冲区后删除数据?
例如ID = A9
,最后一个TP == 1
是 on 2020-05-06 13:00
。我想将该组 ID 的所有数据保留到2020-05-06 13:00
最后TP == 1
加上接下来 2 天的位置?
+---++--------+----------------+
| id| TP| Date|
+---+---------+----------------+
| A1| Null|2010-01-01 12:00|
| A1| Null|2010-01-01 13:00|
| A1| 1|2010-01-02 01:00|
| A1| Null|2010-01-02 02:00|
| A9| Null|2010-05-05 12:00|
| A9| 1|2010-05-05 13:00|
| A9| 1|2010-05-06 13:00|
| A9| Null|2010-05-09 13:00|
+---+---------+----------------+
所需的数据框
+---++--------+----------------+
| id| TP| Date|
+---+---------+----------------+
| A1| Null|2010-01-01 12:00|
| A1| Null|2010-01-01 13:00|
| A1| 1|2010-01-02 01:00|
| A1| Null|2010-01-02 02:00|
| A9| Null|2010-05-05 12:00|
| A9| 1|2010-05-05 13:00|
| A9| 1|2010-05-06 13:00|
+---+---------+----------------+
这就是我在 Pandas 中所做的,但对于 15M+ 的观察来说效率不高
main_pd = main.toPandas()
bigdf = pd.DataFrame()
for i in main_pd.ID.unique():
df = main_pd[main_pd.ID == i]
TPdate = df[df.TP == 1]['Date'].max()+pd.Timedelta('3 days 0 hours')
df = df[(df.Date <= TPdate)]
bigdf = bigdf.append(df)
解决方案
IIUC,您可以使用 Window 函数查找max(IF(TP=1, Date, NULL))
每个id
然后按此阈值过滤:
from pyspark.sql import Window, functions as F
w1 = Window.partitionBy('id')
df_new = df.withColumn('Date', F.to_timestamp('Date', 'yyyy-MM-dd HH:mm')) \
.withColumn('threshhold_date', F.expr("max(IF(TP=1, Date, NULL))").over(w1)) \
.filter('Date <= threshhold_date + interval 2 days')
df_new.show()
+---+----+-------------------+-------------------+
| id| TP| Date| threshhold_date|
+---+----+-------------------+-------------------+
| A9|Null|2010-05-05 12:00:00|2010-05-06 13:00:00|
| A9| 1|2010-05-05 13:00:00|2010-05-06 13:00:00|
| A9| 1|2010-05-06 13:00:00|2010-05-06 13:00:00|
| A1|Null|2010-01-01 12:00:00|2010-01-02 01:00:00|
| A1|Null|2010-01-01 13:00:00|2010-01-02 01:00:00|
| A1| 1|2010-01-02 01:00:00|2010-01-02 01:00:00|
| A1|Null|2010-01-02 02:00:00|2010-01-02 01:00:00|
+---+----+-------------------+-------------------+
推荐阅读
- mysql - 当使用存储过程执行 MySQL 查询时,是否可以将 MySQL 错误记录到文本文件?
- apache-camel - Camel,如何在直接端点上创建真正的同步
- mongodb - 带有类似查询的 Mongodb 遗留 UUID
- c# - 如何添加一个函数来执行连续 ping 并使用关于 ping 响应时间的不同图标更新我的 gridview 中的结果?
- python - 缺少日期的 Pandas 日期行
- reporting-services - 针对 MVC 并发问题的 SSRS 报告查看器控件
- reactjs - 从反应中的测试自动生成演示
- c# - 无法使用 MVVM 访问 XAML 多选 GridView 中的 SelectedItems
- docker - Prisma 无法验证数据库服务器
- clickhouse - sflow 在 ClickHouse 中计算每个 src/dst ip 的 bps