首页 > 解决方案 > 如何在 Spark Scala 中按第 n 个分隔符拆分行

问题描述

我有以下存储在 csv 文件中的数据

1|Roy|NA|2|Marry|4.6|3|Richard|NA|4|Joy|NA|5|Joe|NA|6|Jos|9|

现在我想读取文件并将其存储在 spark 数据帧中,然后将其存储到数据帧中,我想每隔 3 次拆分一次|并将其存储为一行。

预期输出:

1|Roy|NA|
2|Marry|4.6|
3|Richard|NA|
4|Joy|NA|
5|Joe|NA|
6|Jos|9|

你能帮我得到像上面这样的输出吗?

标签: scaladataframeapache-spark

解决方案


首先阅读您的 csv 文件

val df = spark.read.option("delimiter", "|").csv(file)

这会给你这个数据框

+---+---+---+-----+---+---+-------+---+---+----+----+----+----+----+----+----+----+----+
|_c1|_c2|_c3|_c4  |_c5|_c6|_c7    |_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|
+---+---+---+-----+---+---+-------+---+---+----+----+----+----+----+----+----+----+----+
|Roy|NA |2  |Marry|4.6|3  |Richard|NA |4  |Joy |NA  |5   |Joe |NA  |6   |Jos |9   |null|
|Roy|NA |2  |Marry|4.6|3  |Richard|NA |4  |Joy |NA  |5   |Joe |NA  |6   |Jos |9   |null|
|Roy|NA |2  |Marry|4.6|3  |Richard|NA |4  |Joy |NA  |5   |Joe |NA  |6   |Jos |9   |null|
+---+---+---+-----+---+---+-------+---+---+----+----+----+----+----+----+----+----+----+

最后一列是由于 csv 文件中的最后一个分隔符而创建的,因此我们将其删除

val dataframe = df.drop(df.schema.last.name)
dataframe.show(false)
+---+---+---+---+-----+---+---+-------+---+---+----+----+----+----+----+----+----+----+
|_c0|_c1|_c2|_c3|_c4  |_c5|_c6|_c7    |_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|
+---+---+---+---+-----+---+---+-------+---+---+----+----+----+----+----+----+----+----+
|1  |Roy|NA |2  |Marry|4.6|3  |Richard|NA |4  |Joy |NA  |5   |Joe |NA  |6   |Jos |9   |
|1  |Roy|NA |2  |Marry|4.6|3  |Richard|NA |4  |Joy |NA  |5   |Joe |NA  |6   |Jos |9   |
|1  |Roy|NA |2  |Marry|4.6|3  |Richard|NA |4  |Joy |NA  |5   |Joe |NA  |6   |Jos |9   |
+---+---+---+---+-----+---+---+-------+---+---+----+----+----+----+----+----+----+----+

然后,您需要创建一个数组,其中包含您需要在最终数据框中拥有的列名列表

val names : Array[String] = Array("colOne", "colTwo", "colThree")

最后,您需要一个读取 3 的函数

def splitCSV(dataFrame: DataFrame, columnNames : Array[String], sparkSession: SparkSession) : DataFrame = {
    import sparkSession.implicits._
    val columns = dataFrame.columns
    var finalDF : DataFrame = Seq.empty[(String,String,String)].toDF(columnNames:_*)
    for(order <- 0 until(columns.length) -3 by(3) ){
      finalDF = finalDF.union(dataFrame.select(col(columns(order)).as(columnNames(0)), col(columns(order+1)).as(columnNames(1)), col(columns(order+2)).as(columnNames(2))))
    }
    finalDF
  }

在我们将这个函数应用于数据框之后

val finalDF = splitCSV(dataframe, names, sparkSession)
finalDF.show(false)

+------+-------+--------+
|colOne|colTwo |colThree|
+------+-------+--------+
|1     |Roy    |NA      |
|1     |Roy    |NA      |
|1     |Roy    |NA      |
|2     |Marry  |4.6     |
|2     |Marry  |4.6     |
|2     |Marry  |4.6     |
|3     |Richard|NA      |
|3     |Richard|NA      |
|3     |Richard|NA      |
|4     |Joy    |NA      |
|4     |Joy    |NA      |
|4     |Joy    |NA      |
|5     |Joe    |NA      |
|5     |Joe    |NA      |
|5     |Joe    |NA      |
+------+-------+--------+


推荐阅读