首页 > 解决方案 > Spark Listener在Executors上的onJobComplete上执行钩子?

问题描述

我有一个简单的 spark 作业,它从 S3 读取 csv 数据,对其进行转换,对其进行分区并将其保存到本地文件系统。

我在 s3 上有 csv 文件,内容如下

样本输入:日本,2020 年 1 月 1 日,天气,提供商,设备

case class WeatherReport(country:String, date:String, event:String, provide:String, device:String )

object SampleSpark extends App{

     val conf = new SparkConf()
      .setAppName("processing")
      .setIfMissing("spark.master", "local[*]")
      .setIfMissing("spark.driver.host", "localhost")

     val sc = new SparkContext(conf)

     val baseRdd = sc.textFile("s3a://mybucket/sample/*.csv")

     val weatherDataFrame = baseRdd
     .filter(_.trim.nonEmpty)
     .map(x => WeatherReport(x))
     .toDF()

     df.write.partitionBy("date")
      .mode(SaveMode.Append)
      .format("com.databricks.spark.csv")
      .save("outputDirectory")
}

该文件保存在“outputDirectory/date=01-01-2020/part-”中,其中包含 1 个以上的部分文件。我想合并零件文件并删除前缀date=名称,如“outputDirectory/01-01-2020/output.csv”并将其复制到 S3。

怎么可能做到??

我想过像下面那样使用 SparkListener,但我想它只会在 Drive 上运行,但文件会出现在 Executors 上。

sparkContext.addListener(new SparkListener {
      override def onJobEnd(jobEnd: SparkListenerJobEnd) {
        renameDirectory()
        mergePartFilesToSingleFiles()
        uploadFileToS3()
      }
})

有没有办法在 Executors 和 Driver 上运行 post Job Completion 挂钩,将它们上的所有本地文件同步到 S3?

标签: scalaapache-sparkapache-spark-sqlhdfs

解决方案


推荐阅读