首页 > 解决方案 > XML的Spark结构化流动态解析

问题描述

目前我正在使用来自 kafka 源的 xml 数据并使用 Spark 结构化流处理它们。

为了从 xml 中获取所需的信息,我正在使用 xpath。由于我想让管道更加动态,我尝试实现一个字典,其中包含要提取的列名和表达式本身。在未来的版本中,字典可能会被一些配置文件填充,而无需触及 python 作业。

不幸的是,它似乎没有按预期工作(我是一个 python 菜鸟,也许这就是为什么......)。

xml可以描述如下:

<root>
   <header eventId ="1234" .../>
   <.../>
</root>

我的 python 代码如下所示:

df = spark.readStream.format("kafka")...load()
df = df.selectExpr("CAST(timestamp) AS String)", "CAST(value AS String"))
xml_data = df
             .selectExpr("xpath(value, './root/header/@eventId')event_id", ...)
             .selectExpr("explode(arrays_zip(event_id,...)) value"
             .select('value.*')

我的下一步是定义字典:

mapping_dict = {
                'event_id' : './root/header/@eventId',
                ...
               }

我试图重建这样的表达式:

event_id = "\"xpath(value,'" + mapping_dict.get('event_id) + "') event_id\""

xml_data = df.selectExpr(event_id,...)
             .selectExpr("explode(arrays_zip(event_id,...)) value"
             .select('value.*')

现在我尝试在 selectExpr 中使用 dict 值,但它失败并出现错误

org.apache.spark.sql.AnalysisException: cannot resolve '`event_id`' given input columns: [xpath(value, './root/header/@eventId') event_id ...

所以这是我的第一个问题,第二个问题是,我想遍历这个字典并尝试从 xml 中提取每个条目。我不知道我是否可以轻松地使用结构化流来做到这一点,或者我是否必须使用 udf。如果是这样,用于此目的的 udf 会是什么样子?

干杯

标签: pythonapache-sparkpysparkspark-structured-streaming

解决方案


推荐阅读