python - 根据条件更改窗口pyspark数据框中的所有行值
问题描述
我有一个 pyspark 数据框,它有三列 id、seq、seq_checker。我需要按 id 排序并在 seq_checker 列中检查 4 个连续的 1。
我尝试使用窗口函数。我无法根据条件更改窗口中的所有值。
new_window = Window.partitionBy().orderBy("id").rangeBetween(0, 3)
output = df.withColumn('check_sequence',F.when(F.min(df['seq_checker']).over(new_window) == 1, True))
原始pyspark df:
+---+---+-----------+--------------+ | id|seq|seq_checker|check_sequence| +---+---+-----------+--------------+ | 1| 1| 1| 假| | 2| 2| 1| 假| | 3| 3| 1| 假| | 4| 4| 1| 假| | 5| 10| 0| 假| | 6| 14| 1| 假| | 7| 13| 1| 假| | 8| 18| 0| 假| | 9| 23| 0| 假| | 10| 5| 0| 假| | 11| 56| 0| 假| | 12| 66| 0| 假| | 13| 34| 1| 假| | 14| 35| 1| 假| | 15| 36| 1| 假| | 16| 37| 1| 假| | 17| 39| 0| 假| | 18| 54| 0| 假| | 19| 68| 0| 假| | 20| 90| 0| 假| +---+---+-----------+--------------+
所需输出:
+---+---+-----------+--------------+ | id|seq|seq_checker|check_sequence| +---+---+-----------+--------------+ | 1| 1| 1| 是的| | 2| 2| 1| 是的| | 3| 3| 1| 是的| | 4| 4| 1| 是的| | 5| 10| 0| 假| | 6| 14| 1| 假| | 7| 13| 1| 假| | 8| 18| 0| 假| | 9| 23| 0| 假| | 10| 5| 0| 假| | 11| 56| 0| 假| | 12| 66| 0| 假| | 13| 34| 1| 是的| | 14| 35| 1| 是的| | 15| 36| 1| 是的| | 16| 37| 1| 是的| | 17| 39| 0| 假| | 18| 54| 0| 假| | 19| 68| 0| 假| | 20| 90| 0| 假| +---+---+-----------+--------------+
基于上面的代码,我的输出是:
+---+---+-----------+--------------+ | id|seq|seq_checker|check_sequence| +---+---+-----------+--------------+ | 1| 1| 1| 是的| | 2| 2| 1| 空| | 3| 3| 1| 空| | 4| 4| 1| 空| | 5| 10| 0| 空| | 6| 14| 1| 空| | 7| 13| 1| 空| | 8| 18| 0| 空| | 9| 23| 0| 空| | 10| 5| 0| 空| | 11| 56| 0| 空| | 12| 66| 0| 空| | 13| 34| 1| 是的| | 14| 35| 1| 空| | 15| 36| 1| 空| | 16| 37| 1| 空| | 17| 39| 0| 空| | 18| 54| 0| 空| | 19| 68| 0| 空| | 20| 90| 0| 空| +---+---+-----------+--------------+
编辑: 1. 如果我们有超过 4 个连续的行有 1,我们需要将所有行的 check_sequence 标志更改为 True。
- 我的实际问题是在“seq”列中检查长度大于 4 的序列。我能够使用领先和滞后功能创建 seq_checker 列。
解决方案
rangeBetween使您可以访问与当前行相关的行。您为 0,3 定义了一个窗口,可让您访问当前行和随后的三个行,但这只会为 4 个连续 1 行中的第一个 1 设置正确的值。1 的 4 个连续行的第二个元素需要访问前一行和后两行 (-1,2)。1 的 4 个连续行的第三个元素需要访问前两行和后两行 (-2,1)。最后,连续 4 行 1 的第四个元素需要访问前三行(-3,0)。
import pyspark.sql.functions as F
from pyspark.sql import Window
l = [
( 1, 1, 1),
( 2, 2, 1),
( 3, 3, 1),
( 4, 4, 1),
( 5, 10, 0),
( 6, 14, 1),
( 7, 13, 1),
( 8, 18, 0),
( 9, 23, 0),
( 10, 5, 0),
( 11, 56, 0),
( 12, 66, 0),
( 13, 34, 1),
( 14, 35, 1),
( 15, 36, 1),
( 16, 37, 1),
( 17, 39, 0),
( 18, 54, 0),
( 19, 68, 0),
( 20, 90, 0)
]
columns = ['Id','seq','seq_checker']
df=spark.createDataFrame(l, columns)
w1 = Window.partitionBy().orderBy("id").rangeBetween(0, 3)
w2 = Window.partitionBy().orderBy("id").rangeBetween(-1, 2)
w3 = Window.partitionBy().orderBy("id").rangeBetween(-2, 1)
w4 = Window.partitionBy().orderBy("id").rangeBetween(-3, 0)
output = df.withColumn('check_sequence',F.when(
(F.min(df['seq_checker']).over(w1) == 1) |
(F.min(df['seq_checker']).over(w2) == 1) |
(F.min(df['seq_checker']).over(w3) == 1) |
(F.min(df['seq_checker']).over(w4) == 1)
, True).otherwise(False))
output.show()
输出:
+---+---+-----------+--------------+
| Id|seq|seq_checker|check_sequence|
+---+---+-----------+--------------+
| 1| 1| 1| true|
| 2| 2| 1| true|
| 3| 3| 1| true|
| 4| 4| 1| true|
| 5| 10| 0| null|
| 6| 14| 1| null|
| 7| 13| 1| null|
| 8| 18| 0| null|
| 9| 23| 0| null|
| 10| 5| 0| null|
| 11| 56| 0| null|
| 12| 66| 0| null|
| 13| 34| 1| true|
| 14| 35| 1| true|
| 15| 36| 1| true|
| 16| 37| 1| true|
| 17| 39| 0| null|
| 18| 54| 0| null|
| 19| 68| 0| null|
| 20| 90| 0| null|
+---+---+-----------+--------------+
推荐阅读
- r - 绘制 [xts] 时忽略 cex.main
- android - Calendar.DAY_OF_WEEK 下个月不工作
- python - item() 函数在访问像素值时如何工作?
- java - Springboot2使用StringRedisTemplate无法连接docker中的redis集群
- java - @Autowired springboot 对象为空
- javascript - 捕获 AWS S3 获取对象流错误 Node.js
- graph - 检查拓扑排序的有效性
- python - 如何在python中将两个字典列表连接在一起?
- azure-databricks - Databrick - 从挂载的文件存储中读取 BLOB
- r - 获取矩阵每一列的第一个 TRUE 元素的索引,但索引必须 > 到 n