首页 > 解决方案 > 在 Spark 中爆炸复杂的嵌套 XML

问题描述

大家好 ,

我正在尝试在 Spark 中解析一个 xml 文件。我正在使用爆炸功能来展平数据。下面是输入、输出模式和代码。

 Input Schema
   root
     |-- _no: string (nullable = true)
     |-- _sc double (nullable = true)
     |-- _xsi: string (nullable = true)
     |-- header: struct (nullable = true)
     |    |-- con: string (nullable = true)
     |    |-- co: string (nullable = true)
     |    |-- cr: date (nullable = true)
     |    |-- pe: string (nullable = true)
     |    |-- st: timestamp (nullable = true)
     |-- scs: struct (nullable = true)
     |    |-- _te: string (nullable = true)
     |    |-- scle: array (nullable = true)
     |    |    |-- element: struct (containsNull = true)
     |    |    |    |-- cId: long (nullable = true)
     |    |    |    |-- eId: long (nullable = true)
     |    |    |    |-- ent: array (nullable = true)
     |    |    |    |    |-- eent: struct (containsNull = true)
     |    |    |    |    |    |-- MSId: string (nullable = true)
     |    |    |    |    |    |-- _date: date (nullable = true)
     |    |    |    |    |    |-- uas: string (nullable = true)
     |    |    |    |    |    |-- tes: struct (nullable = true)
     |    |    |    |    |    |    |-- time: string (nullable = true)
     |    |    |    |    |    |-- tv: uct (nullable = true)
     |    |    |    |    |    |    |-- LUE: string (nullable = true)
     |    |    |    |    |    |    |-- _dour: string (nullable = true)
     |    |    |    |    |    |    |-- nete: string (nullable = true)
     |    |    |    |    |    |    |-- netSy: string (nullable = true)
     |    |    |    |    |    |    |-- parum: struct (nullable = true)
     |    |    |    |    |    |    |    |-- UE: long (nullable = true)
     |    |    |    |    |    |    |    |-- Parts: long (nullable = true)
     |    |    |    |    |    |    |-- sa: struct (nullable = true)
     |    |    |    |    |    |    |    |-- VA: boolean (nullable = true)
     |    |    |    |    |    |    |    |-- ng: string (nullable = true)
     |    |    |    |    |    |    |-- stitled: struct (nullable = true)
     |    |    |    |    |    |    |    |-- LUE: boolean (nullable = true)
     |    |    |    |    |    |    |    |-- ng: string (nullable = true)
     |    |    |    |    |    |    |-- tvering: struct (nullable = true)
     |    |    |    |    |    |    |    |-- dfLUE: string (nullable = true)
     |    |    |    |    |    |    |    |-- _body: string (nullable = true)
     |    |    |    |    |    |    |-- ubting: struct (nullable = true)
     |    |    |    |    |    |    |    |-- _LUE: string (nullable = true)
     |    |    |    |    |    |    |    |-- dy: string (nullable = true)

需要输出。

root
 |-- _no: string (nullable = true)
 |-- _sc double (nullable = true)
 |-- _xsi: string (nullable = true)
 |-- header: struct (nullable = true)
 |-- con: string (nullable = true)
 |-- co: string (nullable = true)
 |-- cr: date (nullable = true)
 |-- pe: string (nullable = true)
 |-- st: timestamp (nullable = true)
 |-- scs: struct (nullable = true)
 |-- _te: string (nullable = true)
 |-- scle: array (nullable = true)
 |-- element: struct (containsNull = true)
 |-- cId: long (nullable = true)
 |-- eId: long (nullable = true)
 |-- ent: array (nullable = true)
 |-- eent: struct (containsNull = true)
 |-- MSId: string (nullable = true)
 |-- _date: date (nullable = true)
 |-- uas: string (nullable = true)
 |-- tes: struct (nullable = true)
 |-- time: string (nullable = true)
 |-- tv: uct (nullable = true)
 |-- LUE: string (nullable = true)
 |-- _dour: string (nullable = true)
 |-- nete: string (nullable = true)
 |-- netSy: string (nullable = true)
 |-- parum: struct (nullable = true)
 |-- UE: long (nullable = true)
 |-- Parts: long (nullable = true)
 |-- sa: struct (nullable = true)
 |-- VA: boolean (nullable = true)
 |-- ng: string (nullable = true)
 |-- stitled: struct (nullable = true)
 |-- LUE: boolean (nullable = true)
 |-- ng: string (nullable = true)
 |-- tvering: struct (nullable = true)
 |-- dfLUE: string (nullable = true)
 |-- _body: string (nullable = true)
 |-- ubting: struct (nullable = true)
 |-- _LUE: string (nullable = true)
 |-- dy: string (nullable = true)

xml 文件大小为 100MB,当我读取 xml 文件时,数据帧的计数显示为 1。我相信 spark 正在将整个 xml 文件读入一行。

用于爆炸的代码,

val readxml = spark.read.format("xml").option("rowTag","on")\
   .option("inferschema","true").load("/path")
val co= readxml.withColumn("cId", explode(col("scs.scle.cId")))
  .withColumn("eId", explode(col("scs.scle.schedule.eId")))
  .withColumn("exploded_sc", explode(col("scs.scle.ent")))
  .withColumn("uas", explode(col("exploded_sc.uas")))
  .withColumn("ag_dt", explode(col("exploded_sc._date")))
  .withColumn("time", explode(col("exploded_sc.tes.time")))
  .withColumn("MSId", explode(col("exploded_sc.MSId")))
  .withColumn("exploded_tv", explode(col("exploded_sc.tv")))
val finalDF = co.select(col("_sc"),col("header.*"),col("scs._te").as("_te"),
       col("cId"),col("eId"),                          
       col("MSId"),col("time"), col("exploded_tv._dour").as("_dour"),
       col("exploded_tv.tvering.dfLUE").as("tvra"),
       col("exploded_tv.tvering._body").as("body"),
       col("exploded_tv.parum.UE").as("pnum"),
       col("exploded_tv.parum.Parts").as("npart"),
       col("exploded_tv.ubting._LUE").as("tsting"),
       col("exploded_tv.ubting.dy").as("tsting_body"),
       col("exploded_tv.nete").as("netsrce"),
       col("exploded_tv.netSy").as("nettype"),
       col("exploded_tv.sa.VA").as("sp"),
       col("exploded_tv.sa.ng").as("lg"),
       col("exploded_tv.stitled.dfLUE").as("subled"),
       col("exploded_tv.stitled.ng").as("sud_ng"),
       col("uas"),col("ag_dt"))

我从上面的代码中得到了所需的输出模式。但是我无法查看 finalDF 的数据。我怀疑这是因为数据量大。然后我在读取 xml 文件后开始计算每个爆炸的计数。然后我知道由于重复,explode 函数会以指数方式增加行数。有没有其他方法可以实现上述输出。有人可以帮忙吗?先感谢您

标签: xmlscalaapache-sparkxml-parsingdatabricks

解决方案


推荐阅读