首页 > 解决方案 > 基于另一列的滞后窗口函数

问题描述

我有以下 Spark DataFrame:

ID 列_1 column_2
一种 1 100 0
一种 2 200 1
一种 3 800 2
一种 4 1500 3
一种 5 1200 0
一种 6 1600 1
一种 7 2500 2
一种 8 2800 3
一种 9 3000 4

我想创建一个新列,让我们根据 column_2 给出的动态滞后将其称为“dif_column1”。所需的输出将是:

ID 列_1 column_2 差异列1
一种 1 100 0 0
一种 2 200 1 100
一种 3 800 2 700
一种 4 1500 3 1400
一种 5 1200 0 0
一种 6 1600 1 400
一种 7 2500 2 1300
一种 8 2800 3 1600
一种 9 3000 4 1800

我曾尝试使用 lag 函数,但显然我只能将整数与 lag 函数一起使用,因此它不起作用:

w = Window.partitionBy("id")
sdf = sdf.withColumn("dif_column1", F.col("column_1") - F.lag("column_1",F.col("column_2")).over(w))

标签: apache-sparkpysparkapache-spark-sql

解决方案


您可以添加行号列,并根据行号和 column_2 中定义的滞后进行自联接:

from pyspark.sql import functions as F, Window

w = Window.partitionBy("id").orderBy("month")

df1 = df.withColumn('rn', F.row_number().over(w)) 

df2 = df1.alias('t1').join(
    df1.alias('t2'),
    F.expr('(t1.id = t2.id) and (t1.rn = t2.rn + t1.column_2)'),
    'left'
).selectExpr(
    't1.*',
    't1.column_1 - t2.column_1 as dif_column1'
).drop('rn')

df2.show()
+---+-----+--------+--------+-----------+
| id|month|column_1|column_2|dif_column1|
+---+-----+--------+--------+-----------+
|  A|    1|     100|       0|          0|
|  A|    2|     200|       1|        100|
|  A|    3|     800|       2|        700|
|  A|    4|    1500|       3|       1400|
|  A|    5|    1200|       0|          0|
|  A|    6|    1600|       1|        400|
|  A|    7|    2500|       2|       1300|
|  A|    8|    2800|       3|       1600|
|  A|    9|    3000|       4|       1800|
+---+-----+--------+--------+-----------+

推荐阅读