dataframe - PySpark Filter between - 根据组提供上限和下限列表
问题描述
我有一个 PySpark 数据框,并想过滤上限和下限之间的行。通常,我只会使用介于以下之间的过滤器:
import pandas as pd
from pyspark.sql import functions as F
... sql_context creation ...
pdfRaw=pd.DataFrame([{"vehicleID":'A', "Segment":'State Hwy', "speed":68.0},\
{"vehicleID":'B', "Segment":'State Hwy', "speed":76.0}])
dfRaw = sql_context.createDataFrame(pdfRaw).withColumn("vehicleID", "Segment", "speed")
dfRaw.show()
+-----------+------------+-----+
vehicleID| Segment|value|
+-----------+------------+-----+
| A| State Hwy| 68.0|
| B| State Hwy| 73.0|
+-----------+------------+-----+
dfRaw.filter(F.col("speed").between(70,75)).show()
+-----------+------------+-----+
vehicleID| Segment|value|
+-----------+------------+-----+
| B| State Hwy| 73.0|
+-----------+------------+-----+
但是,我有多个速度值,我想在它们之间进行过滤。
Speeds_Curious = {
[25,30],
[55,60],
[60,65],
[70,75]
}
我实际上想更进一步。过滤器的上下限取决于前一个数据帧的分组结果。
df_RoadSegments.groupby('Segment')\
.agg(F.min('SpeedLimit').alias('minSpeed'),\
F.max('SpeedLimit').alias('maxSpeed'))\
.show()
+-----------+----------+----------+
Segment| minSpeed| maxSpeed|
+-----------+----------+----------+
| Urban| 25.0| 30.0|
| State Hwy| 55.0| 60.0|
|I-State Hwy| 60.0| 65.0|
|I-State Hwy| 70.0| 75.0|
+-----------+----------+----------+
所以基本上我想在不同数据帧上作为列可用的值之间过滤数据帧。
就像是:
dfLimits = df_RoadSegments.groupby('Segment')\
.agg(F.min('SpeedLimit').alias('minSpeed'),\ F.max('SpeedLimit').alias('maxSpeed'))
dfRaw.groupby('Segment')\
.filter(F.col("speed")\
.between(dfLimits.where(dfLimits.Segment=="State Hwy"(??)).select('minSpeed')),\
dfLimits.where(dfLimits.Segment=="State Hwy"(??)).select('maxSpeed'))))\
.show()
有什么想法吗?
解决方案
以下方法将为您提供在它们所属的特定部分的速度min
和速度之间的所有车辆。max
您可以加入两个数据框:
df_joined = dfRaw.join(dfLimits, on="Segment", how="left")
+---------+---------+-----+--------+--------+
| Segment|vehicleID|speed|minSpeed|maxSpeed|
+---------+---------+-----+--------+--------+
|State Hwy| A| 68.0| 55| 60|
|State Hwy| B| 76.0| 55| 60|
+---------+---------+-----+--------+--------+
如果您想要进一步标记速度是否在上述范围之间,那么您可以编写:
flag_df = df_joined.withColumn("flag", F.when((F.col("speed") > F.col("minSpeed")) & (F.col("speed") < F.col("minSpeed")), 1).otherwise(0))
flag_df.show()
+---------+---------+-----+--------+--------+----+
| Segment|vehicleID|speed|minSpeed|maxSpeed|flag|
+---------+---------+-----+--------+--------+----+
|State Hwy| A| 68.0| 55| 60| 0|
|State Hwy| B| 76.0| 55| 60| 0|
+---------+---------+-----+--------+--------+----+
然后,您可以简单地过滤标志:
df_final = df.filter(F.col("flag") == 1)
推荐阅读
- couchdb - couchdb 的 Hyperledger Fabric 性能问题
- java - java中矩阵中的值分配
- java - Spring-integration:如何正确调用 SOAP Web 服务?
- mongodb - 带有父引用的集合的 MongoDB 自连接(聚合)
- wpf - 在纯 XAML 中绘制两个元素之间的连接线
- android - 房间特权与查询的关系
- javascript - 使用对象访问javascript中自调用函数的函数
- wordpress - 在客户的电子邮件中添加链接以在 Woocommerce 中基于订单创建事件
- sql-server - 在发布者上添加列
- swift - 何时应使用异步方法的合适挂钟时间阈值?