scala - Scala - 如何合并 HDFS 位置的增量文件
问题描述
我的要求是我有一个多个 HDFS 位置,每小时从 Kafka 摄取文件。因此,对于每个目录,如何将特定时间戳的所有文件合并到当前时间戳作为单个拼花文件,下一次将文件从最后合并的时间戳合并到当前时间戳,并在未来重复相同的操作。这就是我在 Spark Scala 工作中要做的所有事情,所以不能使用普通的 shell 脚本。任何建议表示赞赏。
解决方案
这是一个有助于完成任务的代码片段。
第一步是获取每个日期的文件列表作为地图。(Map[String, List[String]])
其中键是日期,值是具有相同日期的文件列表。日期取自 HDFS 文件的修改时间戳。
注意:使用本地路径测试代码,根据需要提供正确的HDFS路径/ url。
在编写输出时,没有直接选项来指定目标文件名,但您可以指定特定于每个日期的目标目录。代码让我们使用 FileSystem API 将文件重命名为所需的名称,并删除每个日期创建的临时输出文件夹。
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.SparkContext
import org.joda.time.format.DateTimeFormat
object MergeFiles {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Merging files day wise in a directory")
.master("local[2]")
.getOrCreate()
val inputDir = "/Users/sujesh/test_data"
val outputDir = "/Users/sujesh/output_data"
val hadoopConf = spark.sparkContext.hadoopConfiguration
val fs = FileSystem.get(hadoopConf)
val filesPerDate = getFiles(inputDir, fs)
filesPerDate
.foreach { m =>
spark
.read
.format("csv")
.option("inferSchema", false)
.option("header", false)
.load(m._2:_*)
.repartition(1)
.write
.format("csv")
.save(s"$outputDir/${m._1}")
val file = fs.globStatus(new Path(s"$outputDir/${m._1}/part*.csv"))(0).getPath.getName
fs.rename(new Path(s"$outputDir/${m._1}/$file"), new Path(s"$outputDir/${m._1}.csv"))
fs.delete(new Path(s"$outputDir/${m._1}"), true)
}
}
/*
Get the list of files group by date
date is taken from file's modification timestamp
*/
def getFiles(dir: String, fs: FileSystem) = {
fs
.globStatus(new Path(s"$dir/*.csv"))
.map { f: FileStatus =>
(DateTimeFormat.forPattern("yyyyMMdd").print(f.getModificationTime), f.getPath.toUri.getRawPath)
}.groupBy(_._1)
.map { case (k,v) => (k -> v.map(_._2).toSeq) }
}
}
您可以在测试后进一步优化代码并将文件重命名代码转换为 util 如果必须重新使用它。已将所有选项(例如inferSchema
或header
)设置为 false。根据需要使用它们。这种方法也适用于其他格式的文件。
注意:如果您在同一目录中重复执行此过程,则需要进一步调整,因为新创建的文件将具有最新的时间戳。因此,如果这不是每天运行,您也需要显式更新文件的修改时间戳或忽略具有文件名模式的文件,例如yyyyMMdd.csv
推荐阅读
- sql - 大型 MS SQL Server 表非常慢
- python - 为什么 get_success_url 一直将我引导到同一页面?(姜戈)
- angular - 从ionic3中的模态取回数据
- javascript - ReactiveSearch 中的自定义组件
- python - 按条件从列表中排除值的循环失败
- xamarin - 如何在 VS 2019 RC 中安装 Xamarin.Forms 4.0 模板?
- excel - VBA:如果然后在之后有2个语句
- http-redirect - 重定向到这样的网址是否安全:“https://example.com/”+ userData?
- html - 边框没有回来,从 EDGE 中的元素中删除了悬停
- javascript - 用于 Firefox 的带有 unicode 符号的复选框