pyspark - PySpark:对时间戳执行逻辑与
问题描述
我有一个由主 ID、子 ID 和两个时间戳(开始-结束)组成的表。
+-------+---------------------+---------------------+---------------------+
|main_id|sub_id |start_timestamp |end_timestamp |
+-------+---------------------+---------------------+---------------------+
| 1| 1 | 2021/06/01 19:00 | 2021/06/01 19:10 |
| 1| 2 | 2021/06/01 19:01 | 2021/06/01 19:10 |
| 1| 3 | 2021/06/01 19:01 | 2021/06/01 19:05 |
| 1| 3 | 2021/06/01 19:07 | 2021/06/01 19:09 |
我的目标是选择所有具有相同 mainID(不同 subID)的记录,并在时间戳列上执行逻辑 AND(目标是找到所有 subID 都处于活动状态的时段)。
+-------+---------------------------+---------------------------+
|main_id| global_start | global_stop |
+-------+---------------------------+---------------------------+
| 1| 2021/06/01 19:01 | 2021/06/01 19:05 |
| 1| 2021/06/01 19:07 | 2021/06/01 19:09 |
谢谢!
解决方案
部分解决方案
这种逻辑在纯 Spark 中可能真的很难实现。内置函数还不够。此外,预期的输出是 2 行,但一个简单的 group bymain_id
应该只输出一行。因此,背后的逻辑并非微不足道。
我建议您main_id
使用 UDF 对数据进行分组并使用 python 处理它们。
# Agg your data by main_id
df2 = (
df.groupby("main_id", "sub_id")
.agg(
F.collect_list(F.struct("start_timestamp", "end_timestamp")).alias("timestamps")
)
.groupby("main_id")
.agg(F.collect_list(F.struct("sub_id", "timestamps")).alias("data"))
)
df2.show(truncate=False)
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|main_id|data |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |[[3, [[2021/06/01 19:01, 2021/06/01 19:05], [2021/06/01 19:07, 2021/06/01 19:09]]], [1, [[2021/06/01 19:00, 2021/06/01 19:10]]], [2, [[2021/06/01 19:01, 2021/06/01 19:10]]]]|
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
df2.printSchema()
root
|-- main_id: long (nullable = true)
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- sub_id: long (nullable = true)
| | |-- timestamps: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- start_timestamp: string (nullable = true)
| | | | |-- end_timestamp: string (nullable = true)
完成此步骤后,您可以data
使用 Python 处理该列并执行您的logical AND
.
@F.udf # Add required type depending on function return
def process(data):
"""
data is a complex type (see df2.printSchema()) :
list(dict(
"sub_id": value_of_sub_id,
"timestamps": list(dict(
"start_timestamp": value_of_start_timestamp,
"end_timestamp": value_of_end_timestamp,
))
))
"""
... # implement the "logical AND" here.
df2.select(
"main_id",
process(F.col("data"))
)
我希望这可以帮助您或其他人构建最终解决方案。
推荐阅读
- matplotlib - 知识分享:Matplotlib 按钮和鼠标处理程序
- excel - 将 INDEX-MATCH 与可互换的字符串一起使用
- c++ - 是否可以在 C++ 中将父对象强制转换为子对象?
- python - 加快python中的动画线图
- bash - Linux sed 删除带有模式的 2 行
- java - 创建依赖于具有自己的 bootstrap.yml 的 Spring Boot 的库
- python - 在python中将字符串日期转换为日期时间,其中字符串格式为python中的YYYY-MM-DD Hr-Min-Sec
- java - javax.xml.crypto.URIReferenceException:无法解析具有 ID 对象的元素
- regex - 正则表达式选择一两行
- powerbi - 在 PowerBI 中使用日期表动态计算度量