scala - 如何有效地将 ListBuffer[ListBuffer[String]] 转换为多个数据帧并使用 Spark Scala 编写它们
问题描述
我正在尝试使用 Scala 和 Spark 解析一组 XML 文件。我从文件中获取“n”个数据帧的数据。(即数据帧的数量不变,只有文件的数量不同)
我正在解析一组 XML 文件并将数据存储在ListBuffer[ListBuffer[String]]
. 每个都ListBuffer[String]
包含数据帧的数据。例如:
ListBuffer[
ListBuffer["1|2|3|4","5|6|7|8"],
ListBuffer["a|b|c|d","e|f|g|h"],
ListBuffer["q|w|e|r","w|x|y|z"]
]
这将创建 3 个数据框:
Dataframe1:
col1 col2 col3 col4
1 2 3 4
5 6 7 8
和类似的其他2个数据框。
我不能直接将 XML 转换为 Dataframe,因为在制作数据框之前需要在数据中完成很多自定义处理。
我正在使用以下代码将 ListBuffer 转换为 Dataframe:
finalListBuffer.foreach{ data =>
columns = FunctionToReturnColumnsList()
val schema = StructType(columns.map(field => StructField(field, StringType, true)))
val dataRDD: RDD[Row] = sparkSess.sparkContext.parallelize(data.toStream.map(l => Row.fromSeq(l.split("|", -1))))
val df = sparkSess.createDataFrame(dataRDD, schema)
...
}
在这一步之后,对每个数据帧执行一些操作,(一些操作具有数据帧间的依赖关系,所以我不能只处理一个数据帧,然后写入),最后使用以下代码写入数据帧:
df.repartition(1).write.mode("Overwrite").option("multiline", "true").option("delimiter", "\u0017").csv(filename)
在执行这些步骤时,当输入文件大小很大时,我遇到了 2 个问题:
1)创建数据帧时超出了GC开销限制。(dataRDD
创建变量的步骤)
2) 写入 df 时 Spark 心跳超时错误。
如何解决这些问题?
我最初正在考虑使用ListBuffer[RDD[String]]
,而不是ListBuffer[ListBuffer[String]]
但是可以有多达 100 万个文件,每个文件最多可以有 10-20 个 df 条目。我正在做的是,我列出所有文件,并逐个处理它们,并将它们的结果附加到主 ListBuffer。所以,如果我使用 RDD,我将不得不为每个文件使用 union,这可能会很昂贵。还能做什么?
解决方案
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer
scala> val lbs = ListBuffer(
| ListBuffer("1|2|3|4","5|6|7|8"),
| ListBuffer("a|b|c|d","e|f|g|h"),
| ListBuffer("q|w|e|r","w|x|y|z")
| )
lbs: scala.collection.mutable.ListBuffer[scala.collection.mutable.ListBuffer[String]] = ListBuffer(ListBuffer(1|2|3|4, 5|6|7|8), ListBuffer(a|b|c|d, e|f|g|h), ListBuffer(q|w|e|r, w|x|y|z))
scala> val schema = StructType(Seq(StructField("c1", StringType, true),StructField("c2", StringType, true),StructField("c3", StringType, true),StructField("c4", StringType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(c1,StringType,true), StructField(c2,StringType,true), StructField(c3,StringType,true), StructField(c4,StringType,true))
scala> var lb_df: ListBuffer[DataFrame] = ListBuffer()
lb_df: scala.collection.mutable.ListBuffer[org.apache.spark.sql.DataFrame] = ListBuffer()
scala> def createDF(lb: ListBuffer[String]) = spark.createDataFrame(spark.sparkContext.parallelize(lb.toSeq).map(_.toString.split("\\|")).map(Row(_: _*)), schema)
createDF: (lb: scala.collection.mutable.ListBuffer[String])org.apache.spark.sql.DataFrame
scala> lbs.foreach(lb => lb_df.append(createDF(lb)))
scala> lb_df.foreach(_.show())
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
| 1| 2| 3| 4|
| 5| 6| 7| 8|
+---+---+---+---+
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
| a| b| c| d|
| e| f| g| h|
+---+---+---+---+
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
| q| w| e| r|
| w| x| y| z|
+---+---+---+---+
我希望这是有帮助的。
推荐阅读
- android - 为什么 Android Studio - Flutter 在 ubuntu 中对 pubspec.yaml 进行任何更改后一直卡住
- random - Julia,生成四个数组,1:3 之间的 Int8 整数,我需要一个包吗?
- python - 在此类的方法中使用类变量
- apache - 如何在 Ubuntu 20.04 中将 XAMPP 添加到我的桌面?
- html - 与 ngModel 一起使用时未显示 Mat 错误
- php - 根据 SOLID 原则,文件上传过程在 Laravel 上应该如何进行?
- javascript - 如何在 javascript 中捕获 xmlhttprequests 挂起计数
- flutter - Flutter CachedNetworkImage 错误:(CacheManager: Failed to download file)和(SocketException or HttpException)
- java - 找不到符号:变量 raw,位置:R 类
- python - 我在使用 pop 的范围内,但仍然出现超出范围的错误