apache-spark - 在 Spark 数据框中的子字符串之后提取第一次出现的字符串?
问题描述
我有一个 PySpark 数据框(df),它有一个 time_stamp 列和一个消息列(数据类型 str),如下所示:
示例数据框
message time_stamp
some irrelevant text 2015-01-23 08:27:18
irrelevant_text start : string 2015-01-23 08:27:34
contributor Id :XYZ_ABCD 2015-01-23 08:27:54
some irrelevant text 2015-01-23 08:28:36
contributor Id :XYZ_ABCD 2015-01-23 08:28:55
some irrelevant text 2015-01-23 08:29:36
contributor Id :MNOP_xyz 2015-01-23 08:29:45
some irrelevant text 2015-01-23 08:29:30
irrelevant_text end : string 2015-01-23 08:30:47
some irrelevant text 2015-01-23 08:30:59
irrelevant_text start : string 2015-01-23 08:31:34
contributor Id :EFG_A 2015-01-23 08:31:54
some irrelevant text 2015-01-23 08:32:05
contributor Id :pqr_wx 2015-01-23 08:32:15
some irrelevant text 2015-01-23 08:32:26
contributor Id :pqr_wx 2015-01-23 08:33:01
some irrelevant text 2015-01-23 08:33:09
irrelevant_text end : string 2015-01-23 08:40:34
some irrelevant text 2015-01-23 08:40:47
irrelevant_text start : string 2015-01-23 09:31:34
contributor Id :lmo_uvw 2015-01-23 09:31:54
some irrelevant text 2015-01-23 09:32:05
contributor Id :xlr_mot 2015-01-23 09:32:15
some irrelevant text 2015-01-23 09:32:26
irrelevant_text end : string 2015-01-23 09:40:34
some irrelevant text 2015-01-23 09:40:47
我希望在贡献者 ID 之后提取字符串的第一次出现:在开始:字符串和结束:字符串之间以及那些贡献者 ID:只出现一次,并丢弃不是第一次出现的 ID。一个日期可能有多个此类实例。
预期输出:
time_stamp ID
2015-01-23 08:27:54 XYZ_ABCD
2015-01-23 08:29:45 MNOP_xyz
2015-01-23 08:31:54 EFG_A
2015-01-23 08:32:15 pqr_wx
2015-01-23 08:31:54 lmo_uvw
2015-01-23 08:32:15 xlr_mot
解决方案
为ID和开始/结束时间戳的每个分区分配一个行号,并过滤行号为1的行。
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'begin',
F.last(
F.when(F.col('message').rlike('start'), F.col('time_stamp')), True
).over(Window.orderBy('time_stamp'))
).withColumn(
'end',
F.first(
F.when(F.col('message').rlike('end'), F.col('time_stamp')), True
).over(Window.orderBy('time_stamp').rowsBetween(0, Window.unboundedFollowing))
).withColumn(
'ID',
F.regexp_extract('message', 'contributor Id :(\S+)', 1)
).filter(
"ID != '' and begin is not null and end is not null"
).withColumn(
'rn',
F.row_number().over(Window.partitionBy('ID', 'begin', 'end').orderBy('time_stamp'))
).filter(
'rn = 1'
).select(
'time_stamp', 'ID'
).orderBy('time_stamp')
df2.show()
+-------------------+--------+
| time_stamp| ID|
+-------------------+--------+
|2015-01-23 08:27:54|XYZ_ABCD|
|2015-01-23 08:29:45|MNOP_xyz|
|2015-01-23 08:31:54| EFG_A|
|2015-01-23 08:32:15| pqr_wx|
|2015-01-23 09:31:54| lmo_uvw|
|2015-01-23 09:32:15| xlr_mot|
+-------------------+--------+
推荐阅读
- ios - WebView 退出时注销帐户/清除导航
- javascript - 如何使用 node.js 修改我的 HTML 文件?
- javascript - 如何正确处理 ASP.NET Core MVC 中的 AJAX 错误?
- azure-ad-b2c - 在 Azure AD B2C 自定义策略中调用 api 以进行邮政地址验证
- flutter - 如何处理具有相同动画的大量元素?
- python - AZURE ML,python 代码获取:'tls_process_server_certificate','证书验证失败'
- excel - VBA 运行时错误。'1004':复制区域和粘贴区域的大小不同
- python - 从睡眠模式通过python代码打开监视器
- python - 以一个模式开始但不以另一个模式结束的正则表达式
- spring - Spring Boot Starter:重试事务问题