python - XML的Spark结构化流动态解析
问题描述
目前我正在使用来自 kafka 源的 xml 数据并使用 Spark 结构化流处理它们。
为了从 xml 中获取所需的信息,我正在使用 xpath。由于我想让管道更加动态,我尝试实现一个字典,其中包含要提取的列名和表达式本身。在未来的版本中,字典可能会被一些配置文件填充,而无需触及 python 作业。
不幸的是,它似乎没有按预期工作(我是一个 python 菜鸟,也许这就是为什么......)。
xml可以描述如下:
<root>
<header eventId ="1234" .../>
<.../>
</root>
我的 python 代码如下所示:
df = spark.readStream.format("kafka")...load()
df = df.selectExpr("CAST(timestamp) AS String)", "CAST(value AS String"))
xml_data = df
.selectExpr("xpath(value, './root/header/@eventId')event_id", ...)
.selectExpr("explode(arrays_zip(event_id,...)) value"
.select('value.*')
我的下一步是定义字典:
mapping_dict = {
'event_id' : './root/header/@eventId',
...
}
我试图重建这样的表达式:
event_id = "\"xpath(value,'" + mapping_dict.get('event_id) + "') event_id\""
xml_data = df.selectExpr(event_id,...)
.selectExpr("explode(arrays_zip(event_id,...)) value"
.select('value.*')
现在我尝试在 selectExpr 中使用 dict 值,但它失败并出现错误
org.apache.spark.sql.AnalysisException: cannot resolve '`event_id`' given input columns: [xpath(value, './root/header/@eventId') event_id ...
所以这是我的第一个问题,第二个问题是,我想遍历这个字典并尝试从 xml 中提取每个条目。我不知道我是否可以轻松地使用结构化流来做到这一点,或者我是否必须使用 udf。如果是这样,用于此目的的 udf 会是什么样子?
干杯
解决方案
推荐阅读
- c# - 使用 c# 列出安装在 Windows 10 机器上的游戏
- ios - Mac 10.14 中的 Intellij Idea 看不到 ios 模拟器
- linux - 如何在变量中添加“”字符作为自己?
- ios - 在 TableView 单元格中处理从 Firebase 检索数据的更好方法
- python - 使用 Python 导入文件 - 如果文件不存在,如何跳过?
- h2 - 如何在 H2 数据库中执行 N 次查询?
- javascript - 如何通过单击更改按钮的颜色
- r - 如何将向量写入数据表同一行的几列?
- python - 在python中创建曲率动画
- angular - 用于 Anguar 7 的 MSAL 库配置以与 tenant.b2clogin.com 一起使用