scala - Spark Listener在Executors上的onJobComplete上执行钩子?
问题描述
我有一个简单的 spark 作业,它从 S3 读取 csv 数据,对其进行转换,对其进行分区并将其保存到本地文件系统。
我在 s3 上有 csv 文件,内容如下
样本输入:日本,2020 年 1 月 1 日,天气,提供商,设备
case class WeatherReport(country:String, date:String, event:String, provide:String, device:String )
object SampleSpark extends App{
val conf = new SparkConf()
.setAppName("processing")
.setIfMissing("spark.master", "local[*]")
.setIfMissing("spark.driver.host", "localhost")
val sc = new SparkContext(conf)
val baseRdd = sc.textFile("s3a://mybucket/sample/*.csv")
val weatherDataFrame = baseRdd
.filter(_.trim.nonEmpty)
.map(x => WeatherReport(x))
.toDF()
df.write.partitionBy("date")
.mode(SaveMode.Append)
.format("com.databricks.spark.csv")
.save("outputDirectory")
}
该文件保存在“outputDirectory/date=01-01-2020/part-”中,其中包含 1 个以上的部分文件。我想合并零件文件并删除前缀date=
名称,如“outputDirectory/01-01-2020/output.csv”并将其复制到 S3。
怎么可能做到??
我想过像下面那样使用 SparkListener,但我想它只会在 Drive 上运行,但文件会出现在 Executors 上。
sparkContext.addListener(new SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
renameDirectory()
mergePartFilesToSingleFiles()
uploadFileToS3()
}
})
有没有办法在 Executors 和 Driver 上运行 post Job Completion 挂钩,将它们上的所有本地文件同步到 S3?
解决方案
推荐阅读
- android - BillingClientImpl 中 onServiceConnected 的 ANR
- typescript - 如何使用通用接口?
- sql - SQL比较日期
- typescript - 如何修复“错误:捆绑失败:SyntaxError: { path }\PinCode\index.ts: Exporting local "IPinCodeParams",未声明。” 在 React-Native 中?
- javascript - 基于曾孙 javascript 隐藏 Div
- python - 使用套接字发送图像而不保存它
- curl - --noproxy 和 --no-proxy CURL 之间的区别?
- vb.net - 获取特殊文件夹的所有实例的列表(每个用户一个)
- ruby-on-rails - 获取 HTTP 元素 QUERY_STRING 长于 PUMA 服务器中允许的 (1024 * 10) 长度
- excel - 如何在一个范围内应用 VLOOKUP