首页 > 解决方案 > 窗口末尾是否强制清除窗口状态对象?

问题描述

我正在使用窗口 API 将数据划分为 1 小时的窗口。在每个窗口中,我使用一个 Value 状态来为每个窗口存储一个布尔值。

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.days(1)) {
    @Override
    public long extractTimestamp(Event element) {
        return element.timestamp;
    }
})

// Partition by user
.keyBy(new KeySelector<Event, Tuple2<Long, String>>() {
    @Override
    public Tuple2<Long, String> getKey(Event value) {
        return Tuple2.of(value.userGroup, value.userName);
    }
})

.window(TumblingEventTimeWindows.of(Time.minutes(60), Time.minutes(0)))
.allowedLateness(Time.days(1))
.trigger(new WindowTrigger<>(EVENTS_THRESHOLD))
.aggregate(new WindowAggregator(), new WindowProcessor())

.print();
public class WindowProcessor extends ProcessWindowFunction<Long, String, Tuple2<Long, String>, TimeWindow> {

    private final ValueStateDescriptor<Boolean> windowAlertedDescriptor = new ValueStateDescriptor<>("windowAlerted", Boolean.class);

    @Override
    public void process(Tuple2<Long, String> key, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
        long currentDownloadsCount = elements.iterator().next();
        long windowStart = context.window().getStart();
        long windowEnd = context.window().getEnd();

        ValueState<Boolean> windowAlertedState = context.windowState().getState(windowAlertedDescriptor);
        if (BooleanUtils.isTrue(windowAlertedState.value())) {
            return;
        }

我是否必须调用“clear()”方法来清理窗口状态数据?我假设因为 Flink 处理窗口创建和清除,它应该在清除窗口时处理状态清理。

根据此处的答案如何在处理键控窗口后立即清除状态? 一旦窗口被触发,窗口就会自动清除它们的状态。

但是 Flink 文档明确提到您应该调用 clear 方法来删​​除窗口状态 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-进程中的窗口状态窗口函数

标签: apache-flinkflink-streaming

解决方案


窗口 API 中涉及的各种类在许多地方保持状态:

  • 分配给每个流记录的列表Window
  • aTrigger可以是有状态的(例如, a CountTrigger
  • 每个窗口状态(在 a 中ProcessWindowFunction.Context
  • 全局状态(也在 a 中ProcessWindowFunction.Context

前两个(Window 内容和 Trigger 状态)在 Window 被清除时由 Flink 自动清理。清除窗口时,Flink 也会调用你的clear方法ProcessWindowFunction,你应该清除你在那个KeyedStateStore windowState()时候创建​​的每个窗口的状态。

另一方面, 的目的KeyedStateStore globalState()是记住从一个窗口到另一个窗口的东西,所以你不会清除它。然而,如果你有一个无界的键空间,你应该注意清理过时键的全局窗口状态。唯一的方法是在全局状态的状态描述符上指定状态 TTL 。


推荐阅读