首页 > 解决方案 > 使用命名正则表达式在 pyspark 中格式化结构化的 Kafka 流

问题描述

我正在尝试从可流式传输的 pyspark 数据框中的现有列中提取多个列值。

我使用阅读流

stream_dataframe = spark_session.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", broker) \
        .option("subscribe", topic) \
        .option("startingOffsets", "earliest") \
        .load()

我目前正在拆分 value 列中的字符串并将模式应用到该列中,

assert sdf.isStreaming == True, "DataFrame doesn't receive streaming data"
# split attributes to nested array in one Column
col = split(sdf[col_name], split_str)
# now expand col to multiple top-level columns
for idx, field in enumerate(schema):
    sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))
return sdf

我想使用命名的正则表达式而不是上面的。

我尝试使用下面的代码,

host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
ts_pattern = r'\[(\d{2}\/\w{3}\/\d{4}\:\d{2}\:\d{2}\:\d{2} (\+|\-)\d{4})\]'
method = r'\s([A-Z]{3,7})\s'
# url = r'\s((\/((\w+\/*\?*.(\w))+)\s))'
url = r'\s(\/[a-zA-Z0-9\/\S]+)'
protocol = r'\s([A-Z]+\/\d\.\d)\s'
status_pattern_size = r'\s(\d{3})\s(\d+)\s'
uuid_pattern = r'(([A-Za-z0-9\-]+)$)|(([0-9a-f]{32})$)'
df = df.selectExpr(regexp_extract('value', host_pattern, 1).alias('host'),
                   regexp_extract('value', ts_pattern, 1).alias('time'),
                   regexp_extract('value', method, 1).alias('http_method'),
                   regexp_extract('value', url, 1).alias('request_uri'),
                   regexp_extract('value', protocol, 1).alias('http_protocol'),
                   regexp_extract('value', status_pattern_size, 1).cast('integer').alias('response_status'),
                   regexp_extract('value', status_pattern_size, 2).cast('integer').alias('response_time'),
                   regexp_extract('value', uuid_pattern, 1).alias('instance_id'))

这给我一个错误的说法Column is not iterable

我想改用以下名称正则表达式,因为上面会导致多个 regexp_extract 调用,

(?P<host>\S+)\s+\S+\s+(?P<user>\S+)\s+\[(?P<time>.*?)\]\s+(?P<http_method>\S+)\s+(?P<request_uri>\S+)\s+(?P<http_protocol>\S+)\s+(?P<response_status>\S+)\s+(?P<reponse_time>\S+)\s+(?P<instance_id>\S+)

用于提取相应列的值。是否可以在可流式传输的 pyspark 数据帧上做到这一点?

标签: pythonpysparkspark-structured-streamingpyspark-dataframesspark-streaming-kafka

解决方案


推荐阅读