首页 > 解决方案 > 如何将具有多个分隔符的文件转换为数据框

问题描述

我有一个如下的文本文件

1234_4567_DigitalDoc_XRay-01.pdf
2345_5678_DigitalDoc_CTC-03.png
1234_5684_DigitalDoc_XRay-05.pdf
1234_3345_DigitalDoc_XRay-02.pdf

我期待输出为

| catg|sub_catg|      doc_name        |revision_label|extension|
|1234|     4567|DigitalDoc_XRay-01.pdf|   01         |pdf      |

我创建了一个自定义架构

 val customSchema = StructType(
      StructField("catg", StringType, true)
        :: StructField("sub_catg", StringType, true)
        :: StructField("doc_name", StringType, true)
        :: StructField("revision_label", StringType, true)
        :: StructField("extension", StringType, true)
        :: Nil
    )

我正在尝试创建一个数据框

val df = sparkSession.read
  .format("csv")
  .schema(customSchema)
  .option("delimiter", "_")
  .load("src/main/resources/data/sample.txt")

df.show()

我想知道如何通过自定义记录打破每一行

我可能会写一个类似的java代码,有人可以帮我解决一下吗?我是新来的火花。

String word[] = line.split("_");

            String filenName[] = word[3].split("-");
            String revision = filenName[1];
            word[0]+","+word[1]+","+ word[2]+"_"+word[3]+","+revision.replace(".", " ");

标签: dataframeapache-sparkrdd

解决方案


从你离开的地方接机。您可以跳过架构定义并仅提及列名。其余的解释与代码一致

import org.apache.spark.sql.DataFrame

object ParseFileNameToInfo {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    val df : DataFrame = spark.read
      .format("csv")
      .option("delimiter", "_")
      .load("src/main/resources/sampleFileNames.txt")
      //You dont need schema definition as it alwyas simple and all columns are string
        .toDF("catg","sub_catg","doc_name","extraColumn")


    import spark.implicits._

    val output : DataFrame = df.rdd
      //Map the 4 columns to our output columns
      .map( row => {
      val extraColumn = row.getString(3)
      val fileInfo = extraColumn.substring(extraColumn.indexOf("-")+1).split("\\.")
      (row.getString(0),row.getString(1),row.getString(2).concat(row.getString(3)),fileInfo(0),fileInfo(1))
    })
      //Convert them to required output Dataframe
      .toDF("catg","sub_catg","doc_name","revision_label","extension")

    output.show()

  }

}


推荐阅读