首页 > 解决方案 > 如何使用 pyspark 设置动态 where 子句

问题描述

我有一个数据集,其中有多个组。我有一个排名列,它递增计数每组的每个条目。这种结构的一个例子如下所示:

+-----------+---------+---------+
|  equipment|   run_id|run_order|
+-----------+---------+---------+
|1          |430032589|        1|
|1          |430332632|        2|
|1          |430563033|        3|
|1          |430785715|        4|
|1          |431368577|        5|
|1          |431672148|        6|
|2          |435497596|        1|
|1          |435522469|        7|

每个组(设备)都有不同的运行次数。如上所示,设备 1 有 7 个运行,而设备 2 有 1 个运行。我想为每台设备选择第一次和最后一次 n 次运行。选择前 n 次运行很简单:

df.select("equipment", "run_id").distinct().where(df.run_order <= n).orderBy("equipment").show()

不同之处在于查询,因为每一行都相当于一个时间步长,因此每一行都将记录与该时间步长关联的传感器读数。因此会有很多行具有相同的设备,run_id 和 run_order,它们应该保留在最终结果中而不是聚合。

由于每个设备的运行次数都是唯一的,因此我无法使用 where 子句(我认为)进行等效的选择查询来获得最后的 n 次运行:

df.select("equipment", "run_id").distinct().where(df.rank >= total_runs - n).orderBy("equipment").show()

我可以运行 groupBy 以获得每个设备的最高 run_order

+-----------+----------------+
|  equipment| max(run_order) |
+-----------+----------------+
|1          |               7|
|2          |               1|

但我不确定是否有一种方法可以构建一个像这样工作的动态 where 子句。这样我就可以获得最后 n 次运行(包括每次运行的所有时间步长数据)。

标签: pysparkapache-spark-sql

解决方案


您可以为每个设备添加最高等级的列,并根据该列进行过滤:

from pyspark.sql import functions as F, Window

n = 3

df2 = df.withColumn(
    'max_run', 
    F.max('run_order').over(Window.partitionBy('equipment'))
).where(F.col('run_order') >= F.col('max_run') - n)

推荐阅读