scala - 更改 spark _temporary 目录路径以避免删除拼花
问题描述
当两个或多个 Spark 作业具有相同的输出目录时,文件的相互删除将是不可避免的。
我正在使用 spark 2.4.4 以附加模式编写数据帧,我想在 spark 的 tmp 目录中添加时间戳以避免这些删除。
例子:
我的 JobSpark 写在hdfs:/outputFile/0/tmp/file1.parquet
用其他数据调用相同的火花作业并写入hdfs:/outputFil/0/tm/file2.parquet
我希望 jobSpark1 写入,hdfs:/outputFile/0/tmp+(timeStamp)/file1.parquet
其他作业写入,hdfs:/outputFile/0/tmp+(timeStamp)/file2.parquet
然后将镶木地板移动到 hdfs:/outputFile/
解决方案
df
.write
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")
.partitionBy("XXXXXXXX")
.mode(SaveMode.Append)
.format(fileFormat)
.save(path)
当 Spark 将数据附加到现有数据集时,Spark 使用 FileOutputCommitter 来管理暂存输出文件和最终输出文件。FileOutputCommitter 的行为直接影响写入数据的作业的性能。
FileOutputCommitter 有两个方法,commitTask 和 commitJob。Apache Spark 2.0 及更高版本使用 Apache Hadoop 2,它使用 mapreduce.fileoutputcommitter.algorithm.version 的值来控制 commitTask 和 commitJob 的工作方式。在 Hadoop 2 中,mapreduce.fileoutputcommitter.algorithm.version 的默认值为 1。对于这个版本,commitTask 将任务生成的数据从任务临时目录移动到作业临时目录,当所有任务完成后,commitJob 将数据从作业移动到最终目的地的临时目录。
因为驱动在做commitJob的工作,对于云存储来说,这个操作可能需要很长时间。您可能经常认为您的手机正在“挂起”。但是,当mapreduce.fileoutputcommitter.algorithm.version 的值为2 时,commitTask 将一个task 生成的数据直接移动到最终目的地,commitJob 基本上是一个no-op。
推荐阅读
- python-3.x - 将字符串从文件转换为字典
- angular - Angular 语言服务管道导致标识符未定义错误
- javascript - 如何选择某些其他标签内的标签
- reactjs - 更新列表数组时子组件不刷新
- python - 从 scipy.optimize.leastsq() 输出所有猜测
- docker - ingress-nginx 日志 - 出现许多奇怪的条目
- dask - Dask Worker 配置文件和 Dask Scheduler 配置文件的正确位置在哪里?
- javascript - Javascript 正则表达式只接受特定字符
- spring - Dispatcher servlet 中的 ClassCastException 为空
- nginx - 上传文件时 Nginx 413 HTTP 错误,即使 client_max_body_size 设置为高于文件大小