首页 > 解决方案 > 如何在spark scala中为每个记录创建一个xml文件

问题描述

我有一个文件,其中包含如下记录

1_107570667_ANA_2C68EF2F-AB17-40EF-9095-387DE1D5D745_App.xml|<CAudit><ai2aiinst nT="LevFcf#A0" auNdSTy="Analytics" auNdTy="Identifier" ndNo="1" aId="1" conDes="Levered Free Cash Flow" conCd="LevFcf" aiaGUId="1_107570667_ANA_2C68EF2F-AB17-40EF-9095-387DE1D5D745" aiaId="1" aiKey="2990569588" aiId="14" pEndDt="2013-Dec-31" perCd="A" isYr2Dt="False" ><AudNode aId="1" ndNo="2" auNdTy="Operation" auNdSTy="-" nV="2626287569.000000000000000" ><AudNode aId="1" ndNo="3" auNdTy="Operation" auNdSTy="-" nV="2825849069.000000000000000" ><AudNode aId="1" ndNo="4" auNdTy="Identifier" auNdSTy="Standardized" nT="STD.SEBITDA#A0" nV="3130019939.000000000000000" ><ai2si nV="3130019939.00000" nT="STD.SEBITDA#A0" auNdSTy="Standardized" auNdTy="Identifier" ndNo="4" aId="1" inId="1035" conDes="Earnings before Interest, Taxes, Depreciation &amp; Amortization (EBITDA)" conCd="SEBITDA" stdaGUId="841_107570667_STD_2C68EF2F-AB17-40EF-9095-387DE1D5D745" stdIaId="841" siKey="12004131416271429" siId="413" sLiCurIso="KRW" sCurIso="KRW" stCurIso="KRW" stTyCd="INC" sId="1" pEndDt="2013-Dec-31" pId="2" fId="192730348494" fbId="1" /></AudNode><AudNode aId="1" ndNo="5" auNdTy="Identifier" auNdSTy="Standardized" nT="STD.STAX#A0" nV="304170870.000000000000000" ><ai2si nV="304170870.00000" nT="STD.STAX#A0" auNdSTy="Standardized" auNdTy="Identifier" ndNo="5" aId="1" inId="968" conDes="Income Taxes" conCd="STAX" stdaGUId="807_107570667_STD_2C68EF2F-AB17-40EF-9095-387DE1D5D745" stdIaId="807" siKey="120038112041962629" siId="381" sLiCurIso="KRW" sCurIso="KRW" stCurIso="KRW" stTyCd="INC" sId="1" pEndDt="2013-Dec-31" pId="2" fId="192730348494" fbId="1" /></AudNode></AudNode><AudNode aId="1" ndNo="6" auNdTy="Operation" auNdSTy="SUM" nV="199561500.000000000000000" ><AudNode aId="1" ndNo="7" auNdTy="Identifier" auNdSTy="Standardized" nT="STD.SCEX#A0" nV="199561500.000000000000000" ><ai2si nV="199561500.00000" nT="STD.SCEX#A0" auNdSTy="Standardized" auNdTy="Identifier" ndNo="7" aId="1" inId="888" conDes="Capital Expenditures - Total" conCd="SCEX" stdaGUId="704_107570667_STD_2C68EF2F-AB17-40EF-9095-387DE1D5D745" stdIaId="704" siKey="12002771860094347" siId="277" sLiCurIso="KRW" sCurIso="KRW" stCurIso="KRW" stTyCd="CAS" sId="1" pEndDt="2013-Dec-31" pId="2" fId="192730348494" fbId="1" /></AudNode><AudNode aId="1" ndNo="8" auNdTy="Constant" nV="0.000000000000000" /></AudNode></AudNode></ai2aiinst></CAudit>
3_107570667_ANA_2C68EF2F-AB17-40EF-9095-387DE1D5D745_App.xml|<CAudit><ai2aiinst nT="ExcessCashMargin#A0" auNdSTy="Analytics" auNdTy="Identifier" ndNo="1" aId="3" conDes="Excess Cash Margin - %" conCd="ExcessCashMargin" aiaGUId="3_107570667_ANA_2C68EF2F-AB17-40EF-9095-387DE1D5D745" aiaId="3" aiKey="2990569579" aiId="5" pEndDt="2013-Dec-31" perCd="A" isYr2Dt="False" ><AudNode aId="3" ndNo="2" auNdTy="Operation" auNdSTy="*" nV="2.257160458878393" ><AudNode aId="3" ndNo="8" auNdTy="Identifier" auNdSTy="PseudoFinancialConcept" nT="PERCENTSCALE#A0" nV="100.000000000000000" /><AudNode aId="3" ndNo="3" auNdTy="Operation" auNdSTy="//" nV="0.022571604588784" ><AudNode aId="3" ndNo="7" auNdTy="Identifier" auNdSTy="Standardized" nT="STD.STLR#A0" nV="68201182151.000000000000000" ><ai2si nV="68201182151.00000" nT="STD.STLR#A0" auNdSTy="Standardized" auNdTy="Identifier" ndNo="7" aId="3" inId="990" conDes="Revenue from Business Activities - Total" conCd="STLR" stdaGUId="813_107570667_STD_2C68EF2F-AB17-40EF-9095-387DE1D5D745" stdIaId="813" siKey="12003871970759396" siId="387" sLiCurIso="KRW" sCurIso="KRW" stCurIso="KRW" stTyCd="INC" sId="1" pEndDt="2013-Dec-31" pId="2" fId="192730348494" fbId="1" /></AudNode><AudNode aId="3" ndNo="4" auNdTy="Operation" auNdSTy="-" nV="1539410116.000000000000000" ><AudNode aId="3" ndNo="6" auNdTy="Identifier" auNdSTy="Standardized" nT="STD.SNIC#A0" nV="438846856.000000000000000" ><ai2si nV="438846856.00000" nT="STD.SNIC#A0" auNdSTy="Standardized" auNdTy="Identifier" ndNo="6" aId="3" inId="1055" conDes="Net Income after Minority Interest" conCd="SNIC" stdaGUId="856_107570667_STD_2C68EF2F-AB17-40EF-9095-387DE1D5D745" stdIaId="856" siKey="120043012135950005" siId="430" sLiCurIso="KRW" sCurIso="KRW" stCurIso="KRW" stTyCd="INC" sId="1" pEndDt="2013-Dec-31" pId="2" fId="192730348494" fbId="1" /></AudNode><AudNode aId="3" ndNo="5" auNdTy="Identifier" auNdSTy="Standardized" nT="STD.STLO#A0" nV="1978256972.000000000000000" ><ai2si nV="1978256972.00000" nT="STD.STLO#A0" auNdSTy="Standardized" auNdTy="Identifier" ndNo="5" aId="3" inId="924" conDes="Net Cash Flow from Operating Activities" conCd="STLO" stdaGUId="719_107570667_STD_2C68EF2F-AB17-40EF-9095-387DE1D5D745" stdIaId="719" siKey="12002951348701451" siId="295" sLiCurIso="KRW" sCurIso="KRW" stCurIso="KRW" stTyCd="CAS" sId="1" pEndDt="2013-Dec-31" pId="2" fId="192730348494" fbId="1" /></AudNode></AudNode></AudNode></AudNode></ai2aiinst></CAudit>
5_107570667_ANA_2C68EF2F-AB17-40EF-9095-387DE1D5D745_App.xml|<CAudit><ai2aiinst nT="Cf#A0" auNdSTy="Analytics" auNdTy="Identifier" ndNo="1" aId="5" conDes="Cash Flow" conCd="Cf" aiaGUId="5_107570667_ANA_2C68EF2F-AB17-40EF-9095-387DE1D5D745" aiaId="5" aiKey="2990569577" aiId="3" pEndDt="2013-Dec-31" perCd="A" isYr2Dt="False" ><AudNode aId="5" ndNo="2" auNdTy="Operation" auNdSTy="-" nV="898935497.000000000000000" ><AudNode aId="5" ndNo="6" auNdTy="Constant" nV="0.000000000000000" /><AudNode aId="5" ndNo="3" auNdTy="Operation" auNdSTy="+" nV="898935497.000000000000000" ><AudNode aId="5" ndNo="5" auNdTy="Identifier" auNdSTy="Standardized" nT="STD.STDAE#A0" nV="460088641.000000000000000" ><ai2si nV="460088641.00000" nT="STD.STDAE#A0" auNdSTy="Standardized" auNdTy="Identifier" ndNo="5" aId="5" inId="956" conDes="Depreciation, Depletion &amp; Amortization - Total" conCd="STDAE" stdaGUId="796_107570667_STD_2C68EF2F-AB17-40EF-9095-387DE1D5D745" stdIaId="796" siKey="120036611860540497" siId="366" sLiCurIso="KRW" sCurIso="KRW" stCurIso="KRW" stTyCd="INC" sId="1" pEndDt="2013-Dec-31" pId="2" fId="192730348494" fbId="1" /></AudNode><AudNode aId="5" ndNo="4" auNdTy="Identifier" auNdSTy="Standardized" nT="STD.SIAT#A0" nV="438846856.000000000000000" ><ai2si nV="438846856.00000" nT="STD.SIAT#A0" auNdSTy="Standardized" auNdTy="Identifier" ndNo="4" aId="5" inId="1018" conDes="Net Income after Tax" conCd="SIAT" stdaGUId="831_107570667_STD_2C68EF2F-AB17-40EF-9095-387DE1D5D745" stdIaId="831" siKey="120040511473155197" siId="405" sLiCurIso="KRW" sCurIso="KRW" stCurIso="KRW" stTyCd="INC" sId="1" pEndDt="2013-Dec-31" pId="2" fId="192730348494" fbId="1" /></AudNode></AudNode></AudNode></ai2aiinst></CAudit>

我需要为每一行制作一个 xml 文件。xml 文件的名称将是之前的第一列| 所以在这种情况下,我将有 3 个 xml 文件,如下所示

1_107570667_ANA_2C68EF2F-AB17-40EF-9095-387DE1D5D745_App.xml
3_107570667_ANA_2C68EF2F-AB17-40EF-9095-387DE1D5D745_App.xml
5_107570667_ANA_2C68EF2F-AB17-40EF-9095-387DE1D5D745_App.xml

并且每个 xml 文件都会包含|.

像这样我将有 500000 行,我需要为每一行创建 xml 文件。

标签: scalaapache-sparkapache-spark-sql

解决方案


首先,您需要创建一个包含文件名和文件内容作为元组的配对 RDD,然后使用该配对 RDD 将各个文件写入磁盘/hadoop。

您可以查看以下代码片段:

val input = sparkSession.sparkContext.textFile("<your_input_file>")

val pairedRDD = input.map(row => {
  val split = row.split("\\|")
  val fileName = split(0)
  val fileContent = split(1)
  (fileName, fileContent)
})

import org.apache.hadoop.io.NullWritable
import org.apache.spark.HashPartitioner
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}

pairedRDD.partitionBy(new HashPartitioner(1000)).saveAsHadoopFile("<output_path>", classOf[String], classOf[String], classOf[RddMultiTextOutputFormat])

输出:

在此处输入图像描述


推荐阅读