首页 > 解决方案 > k8s上的flink缺少状态值-在jobmanager / taskmanager崩溃时恢复作业

问题描述

在 kubernetes 上运行 flink 作业集群(deployment/pod)时,我们删除了 jobmanager 和 taskmanager(kubectl delete pod XXX)。我们发现之前的 pod 缺少状态,在 pod 运行并正常工作后,rocksDB 和检查点文件路径是从 PVC 挂载的。有什么建议可以在 pod 运行后恢复状态吗?我仔细检查了代码。我发现检查点未启用。这是工作无法恢复的根本原因吗?

环境设置如下

RocksDBStateBackend backend = new RocksDBStateBackend(checkPointDataUri + "/checkpoint",true);
        backend.setDbStoragePath(checkPointDataUri + "/RocksDB");
        backend.setNumberOfTransferingThreads(1);

        // add state backend
        env.setStateBackend((StateBackend)backend);

我们可以像下面这样启用检查点吗?

    env.enableCheckpointing(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

下面是重启日志。

2020-06-09 06:48:11,921 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,921 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,962 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,941 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,962 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,941 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,963 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,921 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,963 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,963 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,963 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,942 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,965 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,961 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,965 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,942 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,981 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,944 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}

标签: apache-flinkflink-streamingflink-cep

解决方案


RocksDB 和检查点存储在同一个文件系统中是没有意义的。RocksDB 应该使用最快的可用本地文件系统——kubernetes 临时存储就可以了。并且检查点必须以持久的方式存储在某种分布式文件系统中。


推荐阅读