apache-spark - 基于另一列的滞后窗口函数
问题描述
我有以下 Spark DataFrame:
ID | 月 | 列_1 | column_2 |
---|---|---|---|
一种 | 1 | 100 | 0 |
一种 | 2 | 200 | 1 |
一种 | 3 | 800 | 2 |
一种 | 4 | 1500 | 3 |
一种 | 5 | 1200 | 0 |
一种 | 6 | 1600 | 1 |
一种 | 7 | 2500 | 2 |
一种 | 8 | 2800 | 3 |
一种 | 9 | 3000 | 4 |
我想创建一个新列,让我们根据 column_2 给出的动态滞后将其称为“dif_column1”。所需的输出将是:
ID | 月 | 列_1 | column_2 | 差异列1 |
---|---|---|---|---|
一种 | 1 | 100 | 0 | 0 |
一种 | 2 | 200 | 1 | 100 |
一种 | 3 | 800 | 2 | 700 |
一种 | 4 | 1500 | 3 | 1400 |
一种 | 5 | 1200 | 0 | 0 |
一种 | 6 | 1600 | 1 | 400 |
一种 | 7 | 2500 | 2 | 1300 |
一种 | 8 | 2800 | 3 | 1600 |
一种 | 9 | 3000 | 4 | 1800 |
我曾尝试使用 lag 函数,但显然我只能将整数与 lag 函数一起使用,因此它不起作用:
w = Window.partitionBy("id")
sdf = sdf.withColumn("dif_column1", F.col("column_1") - F.lag("column_1",F.col("column_2")).over(w))
解决方案
您可以添加行号列,并根据行号和 column_2 中定义的滞后进行自联接:
from pyspark.sql import functions as F, Window
w = Window.partitionBy("id").orderBy("month")
df1 = df.withColumn('rn', F.row_number().over(w))
df2 = df1.alias('t1').join(
df1.alias('t2'),
F.expr('(t1.id = t2.id) and (t1.rn = t2.rn + t1.column_2)'),
'left'
).selectExpr(
't1.*',
't1.column_1 - t2.column_1 as dif_column1'
).drop('rn')
df2.show()
+---+-----+--------+--------+-----------+
| id|month|column_1|column_2|dif_column1|
+---+-----+--------+--------+-----------+
| A| 1| 100| 0| 0|
| A| 2| 200| 1| 100|
| A| 3| 800| 2| 700|
| A| 4| 1500| 3| 1400|
| A| 5| 1200| 0| 0|
| A| 6| 1600| 1| 400|
| A| 7| 2500| 2| 1300|
| A| 8| 2800| 3| 1600|
| A| 9| 3000| 4| 1800|
+---+-----+--------+--------+-----------+
推荐阅读
- c# - .net C# - Winform 在标题栏上出现“未响应”消息后被停用
- c# - 从矢量 CANoe 对象启动/执行 XML 测试节点
- node.js - 如何将 Promise.all 与 fs.unlink 回调一起使用?
- python - 使用python在Android手机上下载图片
- highcharts - 如何在 highcharts 中导出带有自定义 html 部分的图表?
- google-tag-manager - 多行的dataLayer是可能的吗?
- php - PHP while循环在发送后中断
- python - Python触发/模拟自定义浏览器协议
- r - 从 excel 电子表格在 R 中制作出版质量表
- javascript - 如何使用 JavaScript 获取设备的最准确 GPS 坐标?