首页 > 解决方案 > 通过与同一数据框的其他行对应的数据选择行

问题描述

我正在努力选择我的数据框的行。选择取决于同一数据框中的数据。

我的数据集看起来像这样:

from pyspark.sql.session import SparkSession
sc = SparkSession.builder.getOrCreate()
columns = ['Id', 'ActorId', 'EventId', 'Time']
vals = [(3,  3, 'START', '2020-06-22'), 
        (4,  3, 'END',   '2020-06-24'), 
        (5,  3, 'OTHER', '2019-01-15'), 
        (6,  3, 'OTHER', '2020-07-24'), 
        (7,  3, 'OTHER', '2020-06-23'), 
        (8,  4, 'START', '2018-01-15'), 
        (9,  4, 'END',   '2019-01-14'), 
        (10, 4, 'OTHER', '2018-11-14')]
events = sc.createDataFrame(vals,columns)
events.show()

结果是:

+---+-------+-------+----------+
| Id|ActorId|EventId|      Time|
+---+-------+-------+----------+
|  3|      3|  START|2020-06-22|
|  4|      3|    END|2020-06-24|
|  5|      3|  OTHER|2019-01-15|
|  6|      3|  OTHER|2020-07-24|
|  7|      3|  OTHER|2020-06-23|
|  8|      4|  START|2018-01-15|
|  9|      4|    END|2019-01-14|
| 10|      4|  OTHER|2018-11-14|
+---+-------+-------+----------+

(请记住,这只是一个示例 -> 数据提取)

我想找到 EventId==OTHER 的所有行,其中时间不在同一 ActorId 的 START 和 END 事件之间。结果应如下所示:

+---+-------+-------+----------+
| Id|ActorId|EventID|      Time|
+---+-------+-------+----------+
|  5|      3|  OTHER|2019-01-15|
|  6|      3|  OTHER|2020-07-24|
+---+-------+-------+----------+

谢谢您的帮助!!!

标签: pysparkapache-spark-sql

解决方案


这将解决您的问题 - 下面的代码中只有一个假设,即 eventId 列中的 START 和 END 将始终出现在每组的第一行和第二行中。

_w = W.partitionBy('ActorId').orderBy('ActorId')


events = events.withColumn('start_date', F.first('Time').over(_w))
events = events.withColumn('row_num', F.row_number().over(_w))
events = events.withColumn('end_date', F.when(F.col('row_num') == F.lit('2'), F.col('Time')))
events = events.withColumn('end_date', F.coalesce(F.when(F.col('row_num') == F.lit('2'), F.col('Time')), F.min('end_date').over(_w)))
events = events.withColumn('passed_col', F.when(
  (
    ((F.col('Time').cast(T.TimestampType()) > F.col('start_date').cast(T.TimestampType())) & (F.col('Time').cast(T.TimestampType()) > F.col('end_date').cast(T.TimestampType()))) |
                                                                                            
    (
      (F.col('Time').cast(T.TimestampType()) < F.col('start_date').cast(T.TimestampType()))
                                                & (F.col('Time').cast(T.TimestampType()) < F.col('end_date').cast(T.TimestampType())))),F.lit("Passed")))

events = events.select('Id', 'ActorId', 'EventId', 'Time', 'passed_col')
events.show() 

+---+-------+-------+----------+----------+
| Id|ActorId|EventId|      Time|passed_col|
+---+-------+-------+----------+----------+
|  3|      3|  START|2020-06-22|      null|
|  4|      3|    END|2020-06-24|      null|
|  5|      3|  OTHER|2019-01-15|    Passed|
|  6|      3|  OTHER|2020-07-24|    Passed|
|  7|      3|  OTHER|2020-06-23|      null|
|  8|      4|  START|2018-01-15|      null|
|  9|      4|    END|2019-01-14|      null|
| 10|      4|  OTHER|2018-11-14|      null|
+---+-------+-------+----------+----------+

最终答案后过滤---

events = events.filter(F.col('passed_col') == F.lit('Passed')).select('Id', 'ActorId', 'EventId', 'Time', 'passed_col')
events.show()

+---+-------+-------+----------+----------+
| Id|ActorId|EventId|      Time|passed_col|
+---+-------+-------+----------+----------+
|  5|      3|  OTHER|2019-01-15|    Passed|
|  6|      3|  OTHER|2020-07-24|    Passed|
+---+-------+-------+----------+----------+

推荐阅读