首页 > 解决方案 > Flink 中的检查点随时间增加

问题描述

汇总到这个问题,我仍然不清楚为什么我的 Flink 作业的检查点会随着时间的推移而增长和增长,目前,运行大约 7 天,这些检查点从未达到稳定状态。目前我使用的是 Flink 1.10 版本,FS State Backend 作为我的工作无法承受使用 RocksDB 的延迟成本。

查看检查点在 7 天内的演变情况: 在此处输入图像描述 假设我为所有有状态操作符中的状态的 TTL 配置了一个小时或更长的时间,在一种情况下为一天:

public static final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupFullSnapshot().build();

在我看来,所有进入状态的对象都将在过期时间后被清理,因此检查点的大小应该减少,并且我们预计每天的数据量或多或少相同。

另一方面,我们有一条流量曲线,它在一天中的几个小时内有更多的传入数据,但深夜流量下降,所有进入过期状态的对象都应该被清理,导致检查点大小应该减小在流量再次上升之前不会保持相同的大小。

让我们看一下这个用例的代码示例:

DataStream<Event> stream = addSource(source);
KeyedStream<Event, String> keyedStream = stream.filter((FilterFunction<Event>) event ->
                    apply filters here;))
                    .name("Events filtered")
                    .keyBy(k -> k.rType.equals("something") ? k.id1 : k.id2);
keyedStream.flatMap(new MyFlatMapFunction())


public class MyFlatMapFunction extends RichFlatMapFunction<Event, Event>{
private final MapStateDescriptor<String, Event> descriptor = new MapStateDescriptor<>("prev_state", String.class, Event.class);
private MapState<String, Event> previousState;

@Override
    public void open(Configuration parameters) {
        /*ttlConfig described above*/
        descriptor.enableTimeToLive(ttlConfig);
        previousState = getRuntimeContext().getMapState(descriptor);
    }

@Override
    public void flatMap(Event event, Collector<Event> collector) throws Exception {
      final String key = event.rType.equals("something") ? event.id1 : event.id2;
      Event previous = previousState.get(key);
      if(previous != null){
        /*something done here*/
      }else /*something done here*/
        previousState.put(key, previous);
        collector.collect(previous);
 }
}

或多或少这些是用例的结构,还有一些使用 Windows(时间窗口或会话窗口)的结构

问题:

亲切的问候!

标签: apache-flinkflink-streamingflink-cep

解决方案


在这段代码中,您似乎只是在写回已经存在的状态,这仅用于重置 TTL 计时器。这可以解释为什么状态没有过期。

Event previous = previousState.get(key);
if (previous != null) {
  /*something done here*/
} else
  previousState.put(key, previous);

看来您应该使用ValueState而不是MapState. ValueState 有效地提供了一个分片键/值存储,其中的键是用于在 keyBy 中对流进行分区的键。MapState 为您提供每个键的嵌套映射,而不是单个值。但是,由于您在 flatMap 中使用了与最初用于为流设置键的键相同的键,因此键分区的 ValueState 似乎就是您所需要的。


推荐阅读