scala - 如何在 Spark Scala 中按第 n 个分隔符拆分行
问题描述
我有以下存储在 csv 文件中的数据
1|Roy|NA|2|Marry|4.6|3|Richard|NA|4|Joy|NA|5|Joe|NA|6|Jos|9|
现在我想读取文件并将其存储在 spark 数据帧中,然后将其存储到数据帧中,我想每隔 3 次拆分一次|
并将其存储为一行。
预期输出:
1|Roy|NA|
2|Marry|4.6|
3|Richard|NA|
4|Joy|NA|
5|Joe|NA|
6|Jos|9|
你能帮我得到像上面这样的输出吗?
解决方案
首先阅读您的 csv 文件
val df = spark.read.option("delimiter", "|").csv(file)
这会给你这个数据框
+---+---+---+-----+---+---+-------+---+---+----+----+----+----+----+----+----+----+----+
|_c1|_c2|_c3|_c4 |_c5|_c6|_c7 |_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|
+---+---+---+-----+---+---+-------+---+---+----+----+----+----+----+----+----+----+----+
|Roy|NA |2 |Marry|4.6|3 |Richard|NA |4 |Joy |NA |5 |Joe |NA |6 |Jos |9 |null|
|Roy|NA |2 |Marry|4.6|3 |Richard|NA |4 |Joy |NA |5 |Joe |NA |6 |Jos |9 |null|
|Roy|NA |2 |Marry|4.6|3 |Richard|NA |4 |Joy |NA |5 |Joe |NA |6 |Jos |9 |null|
+---+---+---+-----+---+---+-------+---+---+----+----+----+----+----+----+----+----+----+
最后一列是由于 csv 文件中的最后一个分隔符而创建的,因此我们将其删除
val dataframe = df.drop(df.schema.last.name)
dataframe.show(false)
+---+---+---+---+-----+---+---+-------+---+---+----+----+----+----+----+----+----+----+
|_c0|_c1|_c2|_c3|_c4 |_c5|_c6|_c7 |_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|
+---+---+---+---+-----+---+---+-------+---+---+----+----+----+----+----+----+----+----+
|1 |Roy|NA |2 |Marry|4.6|3 |Richard|NA |4 |Joy |NA |5 |Joe |NA |6 |Jos |9 |
|1 |Roy|NA |2 |Marry|4.6|3 |Richard|NA |4 |Joy |NA |5 |Joe |NA |6 |Jos |9 |
|1 |Roy|NA |2 |Marry|4.6|3 |Richard|NA |4 |Joy |NA |5 |Joe |NA |6 |Jos |9 |
+---+---+---+---+-----+---+---+-------+---+---+----+----+----+----+----+----+----+----+
然后,您需要创建一个数组,其中包含您需要在最终数据框中拥有的列名列表
val names : Array[String] = Array("colOne", "colTwo", "colThree")
最后,您需要一个读取 3 的函数
def splitCSV(dataFrame: DataFrame, columnNames : Array[String], sparkSession: SparkSession) : DataFrame = {
import sparkSession.implicits._
val columns = dataFrame.columns
var finalDF : DataFrame = Seq.empty[(String,String,String)].toDF(columnNames:_*)
for(order <- 0 until(columns.length) -3 by(3) ){
finalDF = finalDF.union(dataFrame.select(col(columns(order)).as(columnNames(0)), col(columns(order+1)).as(columnNames(1)), col(columns(order+2)).as(columnNames(2))))
}
finalDF
}
在我们将这个函数应用于数据框之后
val finalDF = splitCSV(dataframe, names, sparkSession)
finalDF.show(false)
+------+-------+--------+
|colOne|colTwo |colThree|
+------+-------+--------+
|1 |Roy |NA |
|1 |Roy |NA |
|1 |Roy |NA |
|2 |Marry |4.6 |
|2 |Marry |4.6 |
|2 |Marry |4.6 |
|3 |Richard|NA |
|3 |Richard|NA |
|3 |Richard|NA |
|4 |Joy |NA |
|4 |Joy |NA |
|4 |Joy |NA |
|5 |Joe |NA |
|5 |Joe |NA |
|5 |Joe |NA |
+------+-------+--------+