pyspark - 通过与同一数据框的其他行对应的数据选择行
问题描述
我正在努力选择我的数据框的行。选择取决于同一数据框中的数据。
我的数据集看起来像这样:
from pyspark.sql.session import SparkSession
sc = SparkSession.builder.getOrCreate()
columns = ['Id', 'ActorId', 'EventId', 'Time']
vals = [(3, 3, 'START', '2020-06-22'),
(4, 3, 'END', '2020-06-24'),
(5, 3, 'OTHER', '2019-01-15'),
(6, 3, 'OTHER', '2020-07-24'),
(7, 3, 'OTHER', '2020-06-23'),
(8, 4, 'START', '2018-01-15'),
(9, 4, 'END', '2019-01-14'),
(10, 4, 'OTHER', '2018-11-14')]
events = sc.createDataFrame(vals,columns)
events.show()
结果是:
+---+-------+-------+----------+
| Id|ActorId|EventId| Time|
+---+-------+-------+----------+
| 3| 3| START|2020-06-22|
| 4| 3| END|2020-06-24|
| 5| 3| OTHER|2019-01-15|
| 6| 3| OTHER|2020-07-24|
| 7| 3| OTHER|2020-06-23|
| 8| 4| START|2018-01-15|
| 9| 4| END|2019-01-14|
| 10| 4| OTHER|2018-11-14|
+---+-------+-------+----------+
(请记住,这只是一个示例 -> 数据提取)
我想找到 EventId==OTHER 的所有行,其中时间不在同一 ActorId 的 START 和 END 事件之间。结果应如下所示:
+---+-------+-------+----------+
| Id|ActorId|EventID| Time|
+---+-------+-------+----------+
| 5| 3| OTHER|2019-01-15|
| 6| 3| OTHER|2020-07-24|
+---+-------+-------+----------+
谢谢您的帮助!!!
解决方案
这将解决您的问题 - 下面的代码中只有一个假设,即 eventId 列中的 START 和 END 将始终出现在每组的第一行和第二行中。
_w = W.partitionBy('ActorId').orderBy('ActorId')
events = events.withColumn('start_date', F.first('Time').over(_w))
events = events.withColumn('row_num', F.row_number().over(_w))
events = events.withColumn('end_date', F.when(F.col('row_num') == F.lit('2'), F.col('Time')))
events = events.withColumn('end_date', F.coalesce(F.when(F.col('row_num') == F.lit('2'), F.col('Time')), F.min('end_date').over(_w)))
events = events.withColumn('passed_col', F.when(
(
((F.col('Time').cast(T.TimestampType()) > F.col('start_date').cast(T.TimestampType())) & (F.col('Time').cast(T.TimestampType()) > F.col('end_date').cast(T.TimestampType()))) |
(
(F.col('Time').cast(T.TimestampType()) < F.col('start_date').cast(T.TimestampType()))
& (F.col('Time').cast(T.TimestampType()) < F.col('end_date').cast(T.TimestampType())))),F.lit("Passed")))
events = events.select('Id', 'ActorId', 'EventId', 'Time', 'passed_col')
events.show()
+---+-------+-------+----------+----------+
| Id|ActorId|EventId| Time|passed_col|
+---+-------+-------+----------+----------+
| 3| 3| START|2020-06-22| null|
| 4| 3| END|2020-06-24| null|
| 5| 3| OTHER|2019-01-15| Passed|
| 6| 3| OTHER|2020-07-24| Passed|
| 7| 3| OTHER|2020-06-23| null|
| 8| 4| START|2018-01-15| null|
| 9| 4| END|2019-01-14| null|
| 10| 4| OTHER|2018-11-14| null|
+---+-------+-------+----------+----------+
最终答案后过滤---
events = events.filter(F.col('passed_col') == F.lit('Passed')).select('Id', 'ActorId', 'EventId', 'Time', 'passed_col')
events.show()
+---+-------+-------+----------+----------+
| Id|ActorId|EventId| Time|passed_col|
+---+-------+-------+----------+----------+
| 5| 3| OTHER|2019-01-15| Passed|
| 6| 3| OTHER|2020-07-24| Passed|
+---+-------+-------+----------+----------+
推荐阅读
- html - 操作复选框以在未选中时让 CSS 旋转
- node.js - Google Sheets API NodeJS - 在不更改格式的情况下更新单元格
- .htaccess - 将这两种条件格式组合在一个 htaccess 中
- node.js - 返回具有相同日期的对象集合(MongoDB/Mongoose)
- python - 如何在 matplotlib 中标记空间刻度线并自定义网格线
- java - Spring Hibernate - 使用属性转换器将实体列表保留为 ID 数组
- python - 等效于 pybind11 中的 boost::python py::scope().attr()
- c++ - 从 C++ 问题中的函数返回数组地址
- php - php fputcsv 保存为二进制文件
- bash - 使用 at 命令时命令不执行