apache-spark - Spark结构化流上的Xml解析
问题描述
我正在尝试在 Databricks 上的 PySpark Structured Streaming 中使用 Kinesis 源分析数据。
我创建了一个数据框,如下所示。
kinDF = spark.readStream.format("kinesis").("streamName", "test-stream-1").load()
后来我将数据从base64编码转换如下。
df = kinDF.withColumn("xml_data", expr("CAST(data as string)"))
现在,我需要使用 xpath 从 df.xml_data 列中提取几个字段。你能建议任何可能的解决方案吗?
如果我直接为这些 xml 文件创建一个数据框xml_df = spark.read.format("xml").options(rowTag='Consumers').load("s3a://bkt/xmldata")
,我可以使用 xpath 进行查询:
xml_df.select("Analytics.Amount1").show()
但是,不确定如何在数据为文本格式的 Spark Streaming 数据帧上类似地提取元素。
是否有任何 xml 函数可以使用模式转换文本数据?我看到了一个使用 from_json 的 json 数据示例。
是否可以在数据框列上使用 spark.read?
我需要为每 5 分钟窗口查找聚合的“Amount1”。
谢谢你的帮助
解决方案
您可以使用com.databricks.spark.xml.XmlReader
从列中读取 xml 数据,但需要一个 RDD,这意味着您需要转换df
为RDD
使用df.rdd
可能会影响性能。
以下是来自 spark java 的未经测试的代码:
import com.databricks.spark.xml
xmlRdd = df = kinDF.select("xml_data").map(r -> r[0])
new XmlReader().xmlRdd(spark, xmlRdd)
推荐阅读
- c# - 如何在 Framework 3.5 中发布 json 并获取文件流作为回报?
- ios - 如何使我的 iphone 应用程序适合 iPad?
- javascript - 在没有任何服务器的浏览器中将 Angular 5 应用程序作为静态网站运行
- javascript - 使用 underscore.js 模板对其父级唯一的动态选择选项
- jquery - Materialize CSS - 动态添加选项到选择列表
- javascript - 如何防止来自父 DOM 的点击事件?
- r - 使用带有 ggplot 的应用函数来绘制数据框列的子集
- export-to-csv - 将数据从 SQL 数据库导出到 CSV 文件,一些数据行被拆分为多行
- java - Error after changing name of Naturald parameter
- video-streaming - 如何在阿里巴巴VOD服务上上传单个视频