python - 获取列的“循环滞后”
问题描述
我想根据现有列的滞后值在 pyspark.sql.DataFrame 中创建一个新列。但是......我也希望最后一个值成为第一个值,第一个值成为最后一个值。这是一个例子:
df = spark.createDataFrame([(1,100),
(2,200),
(3,300),
(4,400),
(5,500)],
['id','value'])
df.show()
+---+-----+
| id|value|
+---+-----+
| 1| 100|
| 2| 200|
| 3| 300|
| 4| 400|
| 5| 500|
+---+-----+
所需的输出将是:
+---+-----+----------------+-----------------+
| id|value|lag_value_plus_2|lag_value_minus_2|
+---+-----+----------------+-----------------+
| 1| 100| 300| 400|
| 2| 200| 400| 500|
| 3| 300| 500| 100|
| 4| 400| 100| 200|
| 5| 500| 200| 300|
+---+-----+----------------+-----------------+
我能感觉到它与窗口函数或 pyspark.sql.lag 函数有关,但不知道该怎么做。
解决方案
这是我可以提供的一种解决方案。但我不确定它是否是最优化的:
from functools import reduce
# Duplicate the dataframe twice, one "before" and one "after"
df = reduce(
lambda a, b : a.union(b),
[df.withColumn("x", F.lit(i)) for i in [-1,0,1]]
)
df.withColumn(
"lag_value_plus_2",
F.lead("value", 2).over(Window.partitionBy().orderBy("x", "id"))
).withColumn(
"lag_value_minus_2",
F.lag("value", 2).over(Window.partitionBy().orderBy("x", "id"))
).where("x=0").drop("x").show()
+---+-----+----------------+-----------------+
| id|value|lag_value_plus_2|lag_value_minus_2|
+---+-----+----------------+-----------------+
| 1| 100| 300| 400|
| 2| 200| 400| 500|
| 3| 300| 500| 100|
| 4| 400| 100| 200|
| 5| 500| 200| 300|
+---+-----+----------------+-----------------+
推荐阅读
- html - 为什么 IE11 中图标丢失和文本被压缩?
- android - android录音应用程序无法在智能手机上运行
- c# - 将米添加到基于矢量的点
- python - groupby 后无法访问数据框列
- excel - 在 Excel 中复制范围时 PasteSpecial 不保留源格式
- vb.net - 使用 vb.net 写入新创建的命令提示符窗口
- windows - Windows 和 MacOS 上 Android Studio 中的 Git 从 Github 推/拉
- php - 从sql中检索时更改格式日期
- java - 下载文件时的文件名问题
- python - 使用 Python,我如何将 40,000 个数据集(按照下面列出的标准)分组到一个有 5 列的表中?