首页 > 解决方案 > Spark结构化流上的Xml解析

问题描述

我正在尝试在 Databricks 上的 PySpark Structured Streaming 中使用 Kinesis 源分析数据。

我创建了一个数据框,如下所示。

kinDF = spark.readStream.format("kinesis").("streamName", "test-stream-1").load()

后来我将数据从base64编码转换如下。

df =  kinDF.withColumn("xml_data", expr("CAST(data as string)"))

现在,我需要使用 xpath 从 df.xml_data 列中提取几个字段。你能建议任何可能的解决方案吗?

如果我直接为这些 xml 文件创建一个数据框xml_df = spark.read.format("xml").options(rowTag='Consumers').load("s3a://bkt/xmldata"),我可以使用 xpath 进行查询:

xml_df.select("Analytics.Amount1").show()

但是,不确定如何在数据为文本格式的 Spark Streaming 数据帧上类似地提取元素。

是否有任何 xml 函数可以使用模式转换文本数据?我看到了一个使用 from_json 的 json 数据示例。

是否可以在数据框列上使用 spark.read?

我需要为每 5 分钟窗口查找聚合的“Amount1”。

谢谢你的帮助

标签: apache-sparkpysparkxml-parsingspark-structured-streaming

解决方案


您可以使用com.databricks.spark.xml.XmlReader从列中读取 xml 数据,但需要一个 RDD,这意味着您需要转换dfRDD使用df.rdd可能会影响性能。

以下是来自 spark java 的未经测试的代码:

import com.databricks.spark.xml

xmlRdd = df = kinDF.select("xml_data").map(r -> r[0])
new XmlReader().xmlRdd(spark, xmlRdd)

推荐阅读