java - 为什么 flink 1.10.1 在使用 FsStateBackend 崩溃重启后没有加载存储的状态
问题描述
我使用 Flink 1.10.1 和 FsStateBackend 作为检查点的状态后端。我有一些有状态的操作,在应用程序运行期间(作为 .jar 应用程序而不是集群运行)它们按预期工作,但如果应用程序由于某种原因停止(或崩溃),则应存储在文件系统中的状态未加载检查点并且函数没有任何先前的引用,那么我需要从数据库中加载信息并将其保存为状态以再次使用这些先前的状态。必须有一种方法可以使用检查点和 FsStateBackend 来执行此操作,而无需从数据库中读取所有信息,只需从已存储的检查点重新加载此状态即可。这可能吗?
这是一些代码:我的检查点配置
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8, GetConfiguration.getConfig());
final StateBackend stateBackend = new FsStateBackend(new Path("/some/path/checkpoints").toUri(), true);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
env.setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(stateBackend);
这是我要避免的示例:
public class EventCountMap extends RichMapFunction<Event, EventCounter> {
private static final MapStateDescriptor<String, Timestamp> descriptor = new MapStateDescriptor<>("previous_counter", String.class, Timestamp.class);
private static final EventCounter eventCounter = new EventCounter();
private MapState<String, Timestamp> previous_state;
private static final StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.days(1))
.cleanupFullSnapshot()
.build();
@Override
public void open(Configuration parameters) {
descriptor.enableTimeToLive(ttlConfig);
previous_state = getRuntimeContext().getMapState(descriptor);
}
/*I want to avoid to call this function that load all events from db and pass them to the state to be used. This happens only once but there must be a efficient way to do this in flink.*/
private void mapRefueled() throws Exception {
Preconditions.checkNotNull(previous_state);
for (Map.Entry<String, Timestamp> map : StreamingJob.update_beh_count_ts.entrySet())
previous_state.put(map.getKey(), map.getValue());
StreamingJob.update_beh_count_ts.clear();
}
@Override
public EventCounter map(Event event) throws Exception {
/*Refuel map state in case of failures*/
if (!StreamingJob.update_beh_count_ts.isEmpty()) mapRefueled();
eventCounter.date = new Date(event.timestamp.getTime());
final String key_first = eventCounter.date.toString().concat("_ts_first");
final String key_last = eventCounter.date.toString().concat("_ts_last");
if (previous_state.contains(key_first) && previous_state.contains(key_last)) {
final Timestamp first = (previous_state.get(key_first).after(event.timestamp)) ? event.timestamp : previous_state.get(key_first);
final Timestamp last = (previous_state.get(key_last).before(event.timestamp)) ? event.timestamp : previous_state.get(key_last);
previous_state.put(key_first, first);
previous_state.put(key_last, last);
} else {
previous_state.put(key_first, event.timestamp);
previous_state.put(key_last, event.timestamp);
}
eventCounter.first_event = previous_state.get(key_first);
eventCounter.last_event = previous_state.get(key_last);
return eventCounter;
}
}
希望我能解释一下自己,让您了解我需要做什么。亲切的问候!提前致谢。
解决方案
推荐阅读
- r - 从 Date 到 POSIXct 对象的转换 - 错误一小时
- .net - SagePay 直接集成套件 v4.00
- google-apps-script - 如何为 Smartsheet 生成重定向 URI 以将数据传递到 Apps 脚本
- flutter - 谷歌地图标记的颤振图像叠加
- c++ - 如何在 EXPECT_CALL 中使用 ElementsAreArray 匹配器
- bash - 复制至少提到一个特定单词的文件
- java - 具有 Hibernate 子实体的“getOrCreate”行为的 CascadeType
- c# - 当 ItemsSource 设置为列表时,让“IsMouseOver”在 ListView 上工作
- excel - 如何找到过滤列的条件最小值?
- blazor - 我在哪里可以找到托管在 blazor aspnet core 中的网站?