python - 时间序列不一致的pyspark滞后函数
问题描述
import pyspark.sql.functions as F
from pyspark.sql.window import Window
我想使用窗口函数从 4 个周期前的列中查找值。
假设我的数据(df)看起来像这样(实际上我有很多不同的 ID):
ID | value | period
a | 100 | 1
a | 200 | 2
a | 300 | 3
a | 400 | 5
a | 500 | 6
a | 600 | 7
如果时间序列是一致的(例如第 1-6 期),我可以使用F.lag(df['value'], count=4).over(Window.partitionBy('id').orderBy('period'))
但是,由于时间序列具有不连续性,因此值会发生偏移。
我想要的输出是这样的:
ID | value | period | 4_lag_value
a | 100 | 1 | nan
a | 200 | 2 | nan
a | 300 | 3 | nan
a | 400 | 5 | 100
a | 500 | 6 | 200
a | 600 | 7 | 300
我怎样才能在 pyspark 中做到这一点?
解决方案
这可能是您正在寻找的:
from pyspark.sql import Window, functions as F
def pyspark_timed_lag_values(df, lags, avg_diff, state_id='state_id', ds='ds', y='y'):
interval_expr = 'sequence(min_ds, max_ds, interval {0} day)'.format(avg_diff)
all_comb = (df.groupBy(F.col(state_id))
.agg(F.min(ds).alias('min_ds'), F.max(ds).alias('max_ds'))
.withColumn(ds, F.explode(F.expr(interval_expr)))
.select(*[state_id, ds]))
all_comb = all_comb.join(df.withColumn('exists', F.lit(True)), on=[state_id, ds], how='left')
window = Window.partitionBy(state_id).orderBy(F.col(ds).asc())
for lag in lags:
all_comb = all_comb.withColumn("{0}_{1}".format(y, lag), F.lag(y, lag).over(window))
all_comb = all_comb.filter(F.col('exists')).drop(*['exists'])
return all_comb
让我们将其应用于一个示例:
data = spark.sparkContext.parallelize([
(1,"2021-01-03",100),
(1,"2021-01-10",830),
(1,"2021-01-17",300),
(1,"2021-02-07",450),
(2,"2021-01-03",500),
(2,"2021-01-17",800),
(2,"2021-02-14",800)])
example = spark.createDataFrame(data, ['state_id','ds','y'])
example = example.withColumn('ds', F.to_date(F.col('ds')))
lags = list(range(1, n_periods + 1))
result = timed_lag_values(example, lags = lags, avg_diff = 7)
导致以下结果:
+--------+----------+---+----+----+----+----+----+----+----+
|state_id| ds| y| y_1| y_2| y_3| y_4| y_5| y_6| y_7|
+--------+----------+---+----+----+----+----+----+----+----+
| 1|2021-01-03|100|null|null|null|null|null|null|null|
| 1|2021-01-10|830| 100|null|null|null|null|null|null|
| 1|2021-01-17|300| 830| 100|null|null|null|null|null|
| 1|2021-02-07|450|null|null| 300| 830| 100|null|null|
| 2|2021-01-03|500|null|null|null|null|null|null|null|
| 2|2021-01-17|800|null| 500|null|null|null|null|null|
| 2|2021-02-14|800|null|null|null| 800|null| 500|null|
+--------+----------+---+----+----+----+----+----+----+----+
现在,它已经为日期做好了准备,但稍作调整,它应该适用于各种用例。在这种情况下,缺点是必须应用 explode 来创建所有可能的日期组合并创建助手 DataFrame all_comb
。
这个解决方案的真正好处是它适用于处理时间序列的大多数用例,因为参数avg_diff
定义了时间段之间的预期距离。
顺便提一下,可能有一个更干净的 Hive SQL 替代方案。
推荐阅读
- screen-readers - Have screen reader pronounce Country codes in a text input as individual letters
- angular - Autodesk Forge Viewer - Angular
- java - Passing one message to just one messagelistener with Spring data redis
- css - React Bootstrap:对齐 col 使右侧的 col 显示在左侧 col 的顶部
- c# - How should I update existing table in the SQL Server database via C# and Windows forms?
- gitlab - Cypress test not finding element when ran in Gitlab CI
- flutter - Can I clear all my subscribed FCM topics by calling deleteToken?
- sql - 获取最接近凌晨 05:00 的记录。我有一个小时和分钟的专栏,需要获取最接近凌晨 5 点的记录。硬编码到凌晨 5 点也可以
- python - Does Python.NET have an analog of Py_InitModule() in C++?
- python - 在python中获取当前一周的开始和结束时间戳