首页 > 解决方案 > 当 PYSPARK 中每月的行数发生变化时,如何实现每月延迟?

问题描述

我想创建一个每月滞后以计算每月回报。

这是我的数据框:

REG_DT_YYYYMM TYPE_CD 生产
202005 足球俱乐部 412316860416
202005 LG 420906795008
202005 力劲 429496729600
202006 足球俱乐部 438086664192
202006 LG 446676598784
202006 力劲 455266533376
202007 足球俱乐部 463856467968
202007 LG 472446402560
202007 力劲 481036337152
202008 足球俱乐部 489626271744
202008 LG 498216206336
202008 力劲 506806140928
202009 足球俱乐部 515396075520
202009 LG 523986010112
202009 力劲 532575944704
202010 足球俱乐部 541165879296
202010 LG 549755813888
202010 力劲 558345748480
202010 LT 566935683072
202011 足球俱乐部 575525617664
202011 LG 584115552256
202011 力劲 592705486848
202011 LT 601295421440
202012 足球俱乐部 609885356032
202012 LG 618475290624
202012 力劲 627065225216
202012 LT 635655159808
202101 足球俱乐部 644245094400
202101 LG 652835028992
202101 力劲 661424963584
202101 LT 670014898176

我现在想创建一个滞后,以便我可以比较每月的生产值。我的问题在于我们从 202010 年及以后有一个额外的行 LT。此外,将来可能会有更多行。

标签: dataframeapache-sparkpysparkapache-spark-sql

解决方案


您可以通过在滞后窗口中设置分区来获取每种类型的前一行:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'lag_production', 
    F.lag('production').over(
        Window.partitionBy('TYPE_CD')
              .orderBy('REG_DT_YYYYMM')
    )
)

推荐阅读