apache-flink - Flink 中的检查点随时间增加
问题描述
汇总到这个问题,我仍然不清楚为什么我的 Flink 作业的检查点会随着时间的推移而增长和增长,目前,运行大约 7 天,这些检查点从未达到稳定状态。目前我使用的是 Flink 1.10 版本,FS State Backend 作为我的工作无法承受使用 RocksDB 的延迟成本。
查看检查点在 7 天内的演变情况: 假设我为所有有状态操作符中的状态的 TTL 配置了一个小时或更长的时间,在一种情况下为一天:
public static final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot().build();
在我看来,所有进入状态的对象都将在过期时间后被清理,因此检查点的大小应该减少,并且我们预计每天的数据量或多或少相同。
另一方面,我们有一条流量曲线,它在一天中的几个小时内有更多的传入数据,但深夜流量下降,所有进入过期状态的对象都应该被清理,导致检查点大小应该减小在流量再次上升之前不会保持相同的大小。
让我们看一下这个用例的代码示例:
DataStream<Event> stream = addSource(source);
KeyedStream<Event, String> keyedStream = stream.filter((FilterFunction<Event>) event ->
apply filters here;))
.name("Events filtered")
.keyBy(k -> k.rType.equals("something") ? k.id1 : k.id2);
keyedStream.flatMap(new MyFlatMapFunction())
public class MyFlatMapFunction extends RichFlatMapFunction<Event, Event>{
private final MapStateDescriptor<String, Event> descriptor = new MapStateDescriptor<>("prev_state", String.class, Event.class);
private MapState<String, Event> previousState;
@Override
public void open(Configuration parameters) {
/*ttlConfig described above*/
descriptor.enableTimeToLive(ttlConfig);
previousState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void flatMap(Event event, Collector<Event> collector) throws Exception {
final String key = event.rType.equals("something") ? event.id1 : event.id2;
Event previous = previousState.get(key);
if(previous != null){
/*something done here*/
}else /*something done here*/
previousState.put(key, previous);
collector.collect(previous);
}
}
或多或少这些是用例的结构,还有一些使用 Windows(时间窗口或会话窗口)的结构
问题:
- 我在这里做错了什么?
- 状态是否在到期时被清理,这种情况与其他用例相同吗?
- 如果它们工作错误,什么可以帮助我修复检查点的大小?
- 这种行为正常吗?
亲切的问候!
解决方案
在这段代码中,您似乎只是在写回已经存在的状态,这仅用于重置 TTL 计时器。这可以解释为什么状态没有过期。
Event previous = previousState.get(key);
if (previous != null) {
/*something done here*/
} else
previousState.put(key, previous);
看来您应该使用ValueState
而不是MapState
. ValueState 有效地提供了一个分片键/值存储,其中的键是用于在 keyBy 中对流进行分区的键。MapState 为您提供每个键的嵌套映射,而不是单个值。但是,由于您在 flatMap 中使用了与最初用于为流设置键的键相同的键,因此键分区的 ValueState 似乎就是您所需要的。
推荐阅读
- javascript - 关于照片未显示在我的画廊中的几个问题
- python - While 循环进行 API 调用,直到满足条件
- c# - 以编程方式在 Unity 上绑定脚本属性(编辑模式不在运行时)
- python-3.x - 无服务器 - 找不到好的绑定路径格式
- javascript - Firefox 扩展:如何从 popup.js 获取当前打开的选项卡的文档?
- azure - 我们如何在 dbfs/filestore 上保存或上传 .py 文件
- sql - CURRENT_TIMESTAMP 和 CURRENT_DATE 之间的差异
- java - 如何将@Qualifier 与服务和存储库一起使用
- python - 减去截断的数字:结果没有被截断?
- android - 是否可以实现像延迟这样的运算符,但也会延迟错误?