首页 > 解决方案 > Flink 中的作业归档是如何生成的?

问题描述

当我们在纱线上运行 Flink 时,已完成/终止/失败的作业存储在作业实现中。例如,我们在 hdfs 上有以下作业档案。有关如何在 hdfs 上生成和存储这些作业档案的任何指示?

-rw-r--r--   3 aaaa   hdfs      10568 2019-07-09 18:34 /tmp/flink/completed-jobs/f909a4ca58cbf1d233a798f7de9489e0
-rw-r--r--   3 bbbb   hdfs       9966 2019-06-20 22:08 /tmp/flink/completed-jobs/fa1fb72ea43348fa84232e7517ca3c91
-rw-r--r--   3 cccc   hdfs      12487 2019-06-26 20:45 /tmp/flink/completed-jobs/fa2b34566384ec621e0d05a2073b8e90
-rw-r--r--   3 dddd   hdfs      57212 2019-07-16 00:41 /tmp/flink/completed-jobs/fa76acb920eec0880a986fb23fbb9149

标签: hdfsapache-flinkhadoop-yarn

解决方案


在 Flink repo 中找到一个相关文件:

https://github.com/apache/flink/blob/57a2b754f6a5d8844aa35afb511901ad7ee43068/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java#L71

HistoryServerArchivist从 flink/runtime/dispatcher/Dispatcher.java 调用

https://github.com/apache/flink/blob/57a2b754f6a5d8844aa35afb511901ad7ee43068/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L126

    @Override
    public CompletableFuture<Acknowledge> archiveExecutionGraph(
                      AccessExecutionGraph executionGraph) {
        try {
            FsJobArchivist.archiveJob(archivePath, executionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(executionGraph));
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (IOException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

推荐阅读