首页 > 解决方案 > 如何有效地将 ListBuffer[ListBuffer[String]] 转换为多个数据帧并使用 Spark Scala 编写它们

问题描述

我正在尝试使用 Scala 和 Spark 解析一组 XML 文件。我从文件中获取“n”个数据帧的数据。(即数据帧的数量不变,只有文件的数量不同)

我正在解析一组 XML 文件并将数据存储在ListBuffer[ListBuffer[String]]. 每个都ListBuffer[String]包含数据帧的数据。例如:

ListBuffer[
    ListBuffer["1|2|3|4","5|6|7|8"],
    ListBuffer["a|b|c|d","e|f|g|h"],
    ListBuffer["q|w|e|r","w|x|y|z"]
]

这将创建 3 个数据框:

Dataframe1:
 col1  col2  col3  col4
   1     2     3     4
   5     6     7     8

和类似的其他2个数据框。

我不能直接将 XML 转换为 Dataframe,因为在制作数据框之前需要在数据中完成很多自定义处理。

我正在使用以下代码将 ListBuffer 转换为 Dataframe:

finalListBuffer.foreach{ data =>

    columns = FunctionToReturnColumnsList()
    val schema = StructType(columns.map(field => StructField(field, StringType, true)))
    val dataRDD: RDD[Row] = sparkSess.sparkContext.parallelize(data.toStream.map(l => Row.fromSeq(l.split("|", -1))))
    val df = sparkSess.createDataFrame(dataRDD, schema)
    ...
}

在这一步之后,对每个数据帧执行一些操作,(一些操作具有数据帧间的依赖关系,所以我不能只处理一个数据帧,然后写入),最后使用以下代码写入数据帧:

df.repartition(1).write.mode("Overwrite").option("multiline", "true").option("delimiter", "\u0017").csv(filename)

在执行这些步骤时,当输入文件大小很大时,我遇到了 2 个问题:

1)创建数据帧时超出了GC开销限制。(dataRDD创建变量的步骤)

2) 写入 df 时 Spark 心跳超时错误。

如何解决这些问题?

我最初正在考虑使用ListBuffer[RDD[String]],而不是ListBuffer[ListBuffer[String]]

但是可以有多达 100 万个文件,每个文件最多可以有 10-20 个 df 条目。我正在做的是,我列出所有文件,并逐个处理它们,并将它们的结果附加到主 ListBuffer。所以,如果我使用 RDD,我将不得不为每个文件使用 union,这可能会很昂贵。还能做什么?

标签: scalaapache-sparkxml-parsingapache-spark-sqlscala-collections

解决方案


scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer

scala> val lbs = ListBuffer(
     |     ListBuffer("1|2|3|4","5|6|7|8"),
     |     ListBuffer("a|b|c|d","e|f|g|h"),
     |     ListBuffer("q|w|e|r","w|x|y|z")
     | )
lbs: scala.collection.mutable.ListBuffer[scala.collection.mutable.ListBuffer[String]] = ListBuffer(ListBuffer(1|2|3|4, 5|6|7|8), ListBuffer(a|b|c|d, e|f|g|h), ListBuffer(q|w|e|r, w|x|y|z))


scala> val schema = StructType(Seq(StructField("c1", StringType, true),StructField("c2", StringType, true),StructField("c3", StringType, true),StructField("c4", StringType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(c1,StringType,true), StructField(c2,StringType,true), StructField(c3,StringType,true), StructField(c4,StringType,true))

scala> var lb_df: ListBuffer[DataFrame] = ListBuffer()
lb_df: scala.collection.mutable.ListBuffer[org.apache.spark.sql.DataFrame] = ListBuffer()

scala> def createDF(lb: ListBuffer[String]) = spark.createDataFrame(spark.sparkContext.parallelize(lb.toSeq).map(_.toString.split("\\|")).map(Row(_: _*)), schema)
createDF: (lb: scala.collection.mutable.ListBuffer[String])org.apache.spark.sql.DataFrame


scala> lbs.foreach(lb => lb_df.append(createDF(lb)))

scala> lb_df.foreach(_.show())
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  1|  2|  3|  4|
|  5|  6|  7|  8|
+---+---+---+---+

+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  a|  b|  c|  d|
|  e|  f|  g|  h|
+---+---+---+---+

+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  q|  w|  e|  r|
|  w|  x|  y|  z|
+---+---+---+---+

我希望这是有帮助的。


推荐阅读