python - 对于以下模式,PySpark regexp_replace 无法按预期工作
问题描述
我正在使用火花流从主题中消费并对数据进行转换。其中有一个正则表达式替换。regexp_replace
from的函数pyspark.sql.functions
不会替换以下模式(我事先使用 regex101.com、re
python 等对其进行了测试):
df.withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)(\[[A-Z,a-z,0-9]+\])',r'$1'))
这是记录的一个片段:
{someVersion=8.3.2-hmg-dev, someUnitName=IB, someMessage=Test. [BL056], someOrigin=MOBILE, someStatus=TEST, duration=3500,
这是正则表达式模式的“目标”:
someMessage=Test. [BL056]
它应该匹配整个目标并分成两组,并将其替换为单独匹配的第一组(如 by r'$1'
)。
这些也是无效的模式:
df.withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)',''))
df.withColumn('value', f.regexp_replace('value', '(\[[A-Z,a-z,0-9]+\])',''))
这有效:
df.withColumn('value', f.regexp_replace('value', 'someMessage=Test. [BL056]',''))
为什么会这样?火花正则表达式引擎是否有特殊性?对于我正在尝试做的事情,正确的模式是什么?
下面列出了示例和整个脚本:
这是“值”列的示例值:
{someVersion=8.3.2-hmg-dev, someUnitName=IB, someMessage=Test. [BL056], someOrigin=MOBILE, someStatus=TEST, duration=3500, someNumber=9872329, someAppOrigin=APP_PADRAO, someId=c3ASAUSQTiWvl_YA9DYpDV:APA91bGfVcLNNGL20hfmaDDS0D8TuzJDuCjj4tgbRNcJcYASIBRVEE2FnA4exnE4ZWTuupRX7FQkdcJiMWkNEatk8lktkFcpR7P7mehb4r_SVnabIabGInjagGZ6pGyweDkxW2JUGK8g, someType=00001, someOriginOpen=null, someOS=null, eventSubType=TESTLOGON, someToken=, ip=error, somePair=0.4220043,-1.084015, eventType=SUCESSO, someMag=aWg4V01qSxDMjAvWmlEWGJ6aExnc2nZJbWZVPQ==, macAddress=33d94a3f7d2f8aff, someJSON=\{"ip":"error","hostname":null,"type":null,"concode":null,"continent":null,"country":null,"country_name":null,"code":null,"name":null,"city":null,"zip":null,"latitude":null,"longitude":null,"anotherJSON":{"id":null,"capital":null,"languages":null,"flag":null,"flag_emoji":null,"flag_emoji_unicode":null,"calling_code":null,"is_eu":null},"time_zone":\{"id":null,"current_time":null,"gmt_offset":null,"code":null,"is_daylight_saving":null},"currency":\{"code":null,"name":null,"plural":null,"symbol":null,"symbol_native":null},"connection":\{"asn":null,"isp":null},"security":\{"is_proxy":null,"proxy_type":null,"is_crawler":null,"crawler_name":null,"crawler_type":null,"is_tor":null,"threat_level":null,"threat_types":null}}, organization=IBPF, codigoCliente=440149, device=Android SDK built for x86, eventDate=6/1/20 4:03 PM}
这是整个代码:
import re
import json
import pyhocon
import fastavro
import requests
from io import BytesIO
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
spark = SparkSession.builder.getOrCreate()
def decode(msg, schema):
bytes_io = BytesIO(msg)
bytes_io.seek(5)
msg = fastavro.schemaless_reader(bytes_io, schema)
return msg
def parse(msg):
conf = pyhocon.ConfigParser.parse(msg)
msg_converter = pyhocon.tool.HOCONConverter.to_json(conf)
msg = json.loads(msg_converter)
return msg
def get_schema(registry_url,topic):
URL = f'\{registry_url}/subjects/\{topic}/versions/latest'
response = requests.get(url=URL, verify=False)
subject = response.json()
schema_id = subject['id']
schema = json.loads(subject['schema'])
return [schema_id, schema]
schema_id, schema = get_schema(registry_url=SCHEMA_REGISTRY,topic=SUBSCRIBE_TOPIC)
spark.udf.register('decode',lambda value: decode(value,schema))
spark.udf.register('parse',parse)
spark.readStream \
.format('kafka') \
.option('subscribe', SUBSCRIBE_TOPIC) \
.option('startingOffsets', 'earliest') \
.option('kafka.bootstrap.servers', HOST) \
.option('kafka.security.protocol', 'SSL') \
.option('kafka.ssl.key.password', KEYSTORE_PASSWORD) \
.option('kafka.ssl.keystore.location', KEYSTORE_PATH) \
.option('kafka.ssl.truststore.location', KEYSTORE_PATH) \
.option('kafka.ssl.keystore.password', KEYSTORE_PASSWORD) \
.option('kafka.ssl.truststore.password', KEYSTORE_PASSWORD) \
.load() \
.selectExpr(f'decode(value) as value') \
.withColumn('value', f.regexp_replace('value', '([A-Za-z]+=[^,]*?)(\[[A-Z,a-z,1-9]+\])','$1'))\
.writeStream \
.format('console') \
.option('truncate', 'false') \
.start()
```
解决方案
国际大学联合会,
如果你只想要输出使用 regexp_extract 并且如果你想替换它使用 regexp replace
我的工作正则表达式是:
df.select(regexp_extract('value','someMessage=\w+\.\ \[\w+\]',0)).show(2,False)
#and
df.select(regexp_extract('value','someMessage=(.*)]',0)).show(2,False)
+-------------------------------------------+
|val |
+-------------------------------------------+
|someMessage=Test. [BL056] |
|someMessage=Test. [BL056] |
+-------------------------------------------+
And if you want to replace use this
df.select(regexp_replace('value','someMessage=(.*)]',''))
推荐阅读
- c++ - 向量
msg {} 不是在 Mac 上的 VS Code 中为 C++ 构建 - python - 是否可以将 SQLAlchemy 表类名称作为参数传递。在 Python 函数中?
- c++ - lambda 中的自动参数类型导致“使用‘模板’关键字处理...”错误
- debian - 如何在 Windows 终端的 debian wsl 中运行 vagrant
- angular - JEST 测试 NestJS 服务模型和 Mongoose
- asp.net-mvc - 同一查询多次执行延迟执行
- powerbi - 创建一个名为“birth_year”的新列以从“birthdate”列中提取年份,并格式化为文本
- azure-storage - Azure blob 存储:在同一 Azure 存储帐户中使用访问层 ARCHIVE 复制 blob 不起作用
- ios - Swift - 如何填充数组
- python - 如何在 Pandas 中删除固定数量的标记行?