首页 > 解决方案 > PySpark:对时间戳执行逻辑与

问题描述

我有一个由主 ID、子 ID 和两个时间戳(开始-结束)组成的表。

+-------+---------------------+---------------------+---------------------+
|main_id|sub_id               |start_timestamp      |end_timestamp        |
+-------+---------------------+---------------------+---------------------+
|      1|  1                  |  2021/06/01 19:00   |  2021/06/01 19:10   |
|      1|  2                  |  2021/06/01 19:01   |  2021/06/01 19:10   |
|      1|  3                  |  2021/06/01 19:01   |  2021/06/01 19:05   |
|      1|  3                  |  2021/06/01 19:07   |  2021/06/01 19:09   |

我的目标是选择所有具有相同 mainID(不同 subID)的记录,并在时间戳列上执行逻辑 AND(目标是找到所有 subID 都处于活动状态的时段)。

+-------+---------------------------+---------------------------+
|main_id|  global_start             |  global_stop              |
+-------+---------------------------+---------------------------+
|      1|  2021/06/01 19:01         |  2021/06/01 19:05         |
|      1|  2021/06/01 19:07         |  2021/06/01 19:09         |

谢谢!

标签: pysparktimestamp

解决方案


部分解决方案

这种逻辑在纯 Spark 中可能真的很难实现。内置函数还不够。此外,预期的输出是 2 行,但一个简单的 group bymain_id应该只输出一行。因此,背后的逻辑并非微不足道。

我建议您main_id使用 UDF 对数据进行分组并使用 python 处理它们。

# Agg your data by main_id
df2 = (
    df.groupby("main_id", "sub_id")
    .agg(
        F.collect_list(F.struct("start_timestamp", "end_timestamp")).alias("timestamps")
    )
    .groupby("main_id")
    .agg(F.collect_list(F.struct("sub_id", "timestamps")).alias("data"))
)

df2.show(truncate=False)
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|main_id|data                                                                                                                                                                         |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1      |[[3, [[2021/06/01 19:01, 2021/06/01 19:05], [2021/06/01 19:07, 2021/06/01 19:09]]], [1, [[2021/06/01 19:00, 2021/06/01 19:10]]], [2, [[2021/06/01 19:01, 2021/06/01 19:10]]]]|
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

df2.printSchema()
root
 |-- main_id: long (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- sub_id: long (nullable = true)
 |    |    |-- timestamps: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- start_timestamp: string (nullable = true)
 |    |    |    |    |-- end_timestamp: string (nullable = true)

完成此步骤后,您可以data使用 Python 处理该列并执行您的logical AND.

@F.udf  # Add required type depending on function return
def process(data):
    """
    data is a complex type (see df2.printSchema()) : 
      list(dict(
        "sub_id": value_of_sub_id,
        "timestamps": list(dict(
            "start_timestamp": value_of_start_timestamp,
            "end_timestamp": value_of_end_timestamp,
        ))
      ))
    """
    ... # implement the "logical AND" here.

df2.select(
    "main_id",
    process(F.col("data"))
)

我希望这可以帮助您或其他人构建最终解决方案。


推荐阅读