首页 > 解决方案 > Scala - 如何合并 HDFS 位置的增量文件

问题描述

我的要求是我有一个多个 HDFS 位置,每小时从 Kafka 摄取文件。因此,对于每个目录,如何将特定时间戳的所有文件合并到当前时间戳作为单个拼花文件,下一次将文件从最后合并的时间戳合并到当前时间戳,并在未来重复相同的操作。这就是我在 Spark Scala 工作中要做的所有事情,所以不能使用普通的 shell 脚本。任何建议表示赞赏。

标签: scalashellfileapache-sparkhdfs

解决方案


这是一个有助于完成任务的代码片段。

第一步是获取每个日期的文件列表作为地图。(Map[String, List[String]])其中键是日期,值是具有相同日期的文件列表。日期取自 HDFS 文件的修改时间戳。

注意:使用本地路径测试代码,根据需要提供正确的HDFS路径/ url。

在编写输出时,没有直接选项来指定目标文件名,但您可以指定特定于每个日期的目标目录。代码让我们使用 FileSystem API 将文件重命名为所需的名称,并删除每个日期创建的临时输出文件夹。

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.SparkContext
import org.joda.time.format.DateTimeFormat


object MergeFiles {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Merging files day wise in a directory")
      .master("local[2]")
      .getOrCreate()

    val inputDir = "/Users/sujesh/test_data"
    val outputDir = "/Users/sujesh/output_data"

    val hadoopConf = spark.sparkContext.hadoopConfiguration
    val fs = FileSystem.get(hadoopConf)

    val filesPerDate = getFiles(inputDir, fs)

    filesPerDate
      .foreach { m =>
        spark
          .read
          .format("csv")
          .option("inferSchema", false)
          .option("header", false)
          .load(m._2:_*)
          .repartition(1)
          .write
          .format("csv")
          .save(s"$outputDir/${m._1}")

        val file = fs.globStatus(new Path(s"$outputDir/${m._1}/part*.csv"))(0).getPath.getName
        fs.rename(new Path(s"$outputDir/${m._1}/$file"), new Path(s"$outputDir/${m._1}.csv"))
        fs.delete(new Path(s"$outputDir/${m._1}"), true)
      }
  }

  /*
    Get the list of files group by date
    date is taken from file's modification timestamp
   */
  def getFiles(dir: String, fs: FileSystem) = {
    fs
      .globStatus(new Path(s"$dir/*.csv"))
      .map { f: FileStatus =>
        (DateTimeFormat.forPattern("yyyyMMdd").print(f.getModificationTime), f.getPath.toUri.getRawPath)
       }.groupBy(_._1)
       .map { case (k,v) => (k -> v.map(_._2).toSeq) }
  }
}

您可以在测试后进一步优化代码并将文件重命名代码转换为 util 如果必须重新使用它。已将所有选项(例如inferSchemaheader)设置为 false。根据需要使用它们。这种方法也适用于其他格式的文件。

注意:如果您在同一目录中重复执行此过程,则需要进一步调整,因为新创建的文件将具有最新的时间戳。因此,如果这不是每天运行,您也需要显式更新文件的修改时间戳或忽略具有文件名模式的文件,例如yyyyMMdd.csv


推荐阅读