pyspark - 如何使用 pyspark 设置动态 where 子句
问题描述
我有一个数据集,其中有多个组。我有一个排名列,它递增计数每组的每个条目。这种结构的一个例子如下所示:
+-----------+---------+---------+
| equipment| run_id|run_order|
+-----------+---------+---------+
|1 |430032589| 1|
|1 |430332632| 2|
|1 |430563033| 3|
|1 |430785715| 4|
|1 |431368577| 5|
|1 |431672148| 6|
|2 |435497596| 1|
|1 |435522469| 7|
每个组(设备)都有不同的运行次数。如上所示,设备 1 有 7 个运行,而设备 2 有 1 个运行。我想为每台设备选择第一次和最后一次 n 次运行。选择前 n 次运行很简单:
df.select("equipment", "run_id").distinct().where(df.run_order <= n).orderBy("equipment").show()
不同之处在于查询,因为每一行都相当于一个时间步长,因此每一行都将记录与该时间步长关联的传感器读数。因此会有很多行具有相同的设备,run_id 和 run_order,它们应该保留在最终结果中而不是聚合。
由于每个设备的运行次数都是唯一的,因此我无法使用 where 子句(我认为)进行等效的选择查询来获得最后的 n 次运行:
df.select("equipment", "run_id").distinct().where(df.rank >= total_runs - n).orderBy("equipment").show()
我可以运行 groupBy 以获得每个设备的最高 run_order
+-----------+----------------+
| equipment| max(run_order) |
+-----------+----------------+
|1 | 7|
|2 | 1|
但我不确定是否有一种方法可以构建一个像这样工作的动态 where 子句。这样我就可以获得最后 n 次运行(包括每次运行的所有时间步长数据)。
解决方案
您可以为每个设备添加最高等级的列,并根据该列进行过滤:
from pyspark.sql import functions as F, Window
n = 3
df2 = df.withColumn(
'max_run',
F.max('run_order').over(Window.partitionBy('equipment'))
).where(F.col('run_order') >= F.col('max_run') - n)
推荐阅读
- linux - 随机排序不适用于 --random_source=FILE
- c# - 连接字符串时,C# 中的 for 循环为何如此缓慢?
- javascript - assert.fail(1, 2) 不显示 node.js 10 的弃用警告
- python - 如何使用两个嵌套列表来制作字典列表?
- error-handling - 在 Smalltalk/Pharo 中创建以块为参数的键值消息
- java - 由于缓慢的 IO 调用,Google Dataflow 的工作人员几乎处于空闲状态
- javascript - Typescript Promise 拒绝类型
- svg - SVG 旋转(矩阵)矩形的宽度。看起来像宽度和高度数字被缩放
- php - 如何获取数组的值?
- reactjs - Redux 表单:访问 JSX 表单中的字段组件 meta.touched 属性