sql - 在包含相同 ID 的两行之间提取 ID 和不同的错误
问题描述
我有一个包含两列(“time_stamp”和“message”)的 Spark 数据框。
示例数据框:
Time_stamp Message
2020-12-01 05:28:34:215 some text1 ID: 1
2020-12-01 05:28:40:210 some text2 error: A
2020-12-01 05:28:40:220 some text3 error: B
2020-12-01 05:28:41:203 some text4 error: A
2020-12-01 05:30:43:201 some text5 ID: 1
2020-12-01 05:32:50:215 some text6 ID: 2
2020-12-01 05:32:50:220 some text7 error: A
2020-12-01 05:48:51:220 some text8 error: C
2020-12-01 05:48:52:203 some text9 error: B
2020-12-01 05:51:53:201 some text10 ID: 2
我想在包含相同 id 的两行之间制作另一个带有 ID 和明显错误的数据框。
预期输出:
示例表:
ID Error
1 A
1 B
2 A
2 C
2 B
我尝试了以下代码。但是,它使用 Azure Databricks 不支持的 windows 功能,并且代码需要很长时间才能执行。
import pyspark.sql.functions as F
from pyspark.sql.window import Window
df2 = df.withColumn(
'Time_stamp',
F.to_timestamp('Time_stamp', 'yyyy-MM-dd HH:mm:ss:SSS')
).withColumn(
'ID',
F.regexp_extract('Message', 'ID: ([a-zA-Z0-9]+)', 1)
).withColumn(
'ID',
F.last(F.when(F.col('ID') != '', F.col('ID')), True).over(Window.orderBy('Time_stamp'))
).filter(
F.col('message').rlike('error')
).withColumn(
'Message',
F.regexp_extract('Message', 'error: (.*)', 1)
).groupBy('ID').agg(
F.collect_set(F.array('Message', 'Time_stamp')).alias('Message')
).select(
'ID',
F.explode('Message').alias('Message')
).selectExpr(
'ID',
'Message[0] as error',
'Message[1] as Time_stamp'
).withColumn(
'rn',
F.row_number().over(Window.partitionBy('ID', 'error').orderBy('Time_stamp'))
).filter('rn = 1').orderBy('Time_stamp').select('ID', 'error')
任何人都可以提供 SQL 的解决方案吗?PySpark SQL 在 Azure 数据块中得到很好的支持。
解决方案
没什么好说的,除了我认为 pyspark 看起来比 spark SQL 更好......
df.createOrReplaceTempView('df')
result = spark.sql("""
select ID, error
from (
select *, row_number() over (partition by ID, error order by Time_stamp) rn
from (
select ID, Message[0] error, Message[1] Time_stamp
from (
select ID, explode(Message) Message
from (
select ID, collect_set(array(Message, Time_stamp)) Message
from (
select Time_stamp, regexp_extract(Message, 'error: (.*)', 1) Message, ID
from (
select Time_stamp, Message, last(case when ID != '' then ID end, true) over (order by Time_stamp) ID
from (
select to_timestamp(Time_stamp, 'yyyy-MM-dd HH:mm:ss:SSS') Time_stamp, Message, regexp_extract(Message, 'ID: ([a-zA-Z0-9]+)', 1) ID
from df
)
) where Message rlike 'error'
) group by ID
)
)
)
) where rn = 1 order by Time_stamp""")
result.show()
+---+-----+
| ID|error|
+---+-----+
| 1| A|
| 1| B|
| 2| A|
| 2| C|
| 2| B|
+---+-----+
推荐阅读
- javascript - JS 更改 FullCalendar v4 事件显示
- cmd - tf.exe 抱怨“TF30063:您无权访问”
- sas - 匹配两个不同的数据集
- sql - 从 BigQuery 中的选择创建字符串分区表
- google-cloud-platform - 帐户登录审计和跟踪活动
- azure - 如何防止基于存储访问策略的 Azure 存储共享访问签名缓存在浏览器中
- python - 什么是 Python 2 的 multidict 易于适应的替代方案?
- java - Gson 中是否有类似 JsonFormat 的内容?
- javascript - 向 iCal 数据/日历对象添加警报
- wordpress - 引导菜单的固定顶级类在 WordPress 的大型设备上不起作用