apache-flink - 窗口末尾是否强制清除窗口状态对象?
问题描述
我正在使用窗口 API 将数据划分为 1 小时的窗口。在每个窗口中,我使用一个 Value 状态来为每个窗口存储一个布尔值。
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.days(1)) {
@Override
public long extractTimestamp(Event element) {
return element.timestamp;
}
})
// Partition by user
.keyBy(new KeySelector<Event, Tuple2<Long, String>>() {
@Override
public Tuple2<Long, String> getKey(Event value) {
return Tuple2.of(value.userGroup, value.userName);
}
})
.window(TumblingEventTimeWindows.of(Time.minutes(60), Time.minutes(0)))
.allowedLateness(Time.days(1))
.trigger(new WindowTrigger<>(EVENTS_THRESHOLD))
.aggregate(new WindowAggregator(), new WindowProcessor())
.print();
public class WindowProcessor extends ProcessWindowFunction<Long, String, Tuple2<Long, String>, TimeWindow> {
private final ValueStateDescriptor<Boolean> windowAlertedDescriptor = new ValueStateDescriptor<>("windowAlerted", Boolean.class);
@Override
public void process(Tuple2<Long, String> key, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
long currentDownloadsCount = elements.iterator().next();
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
ValueState<Boolean> windowAlertedState = context.windowState().getState(windowAlertedDescriptor);
if (BooleanUtils.isTrue(windowAlertedState.value())) {
return;
}
我是否必须调用“clear()”方法来清理窗口状态数据?我假设因为 Flink 处理窗口创建和清除,它应该在清除窗口时处理状态清理。
根据此处的答案如何在处理键控窗口后立即清除状态? 一旦窗口被触发,窗口就会自动清除它们的状态。
但是 Flink 文档明确提到您应该调用 clear 方法来删除窗口状态 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-进程中的窗口状态窗口函数
解决方案
窗口 API 中涉及的各种类在许多地方保持状态:
- 分配给每个流记录的列表
Window
- a
Trigger
可以是有状态的(例如, aCountTrigger
) - 每个窗口状态(在 a 中
ProcessWindowFunction.Context
) - 全局状态(也在 a 中
ProcessWindowFunction.Context
)
前两个(Window 内容和 Trigger 状态)在 Window 被清除时由 Flink 自动清理。清除窗口时,Flink 也会调用你的clear
方法ProcessWindowFunction
,你应该清除你在那个KeyedStateStore windowState()
时候创建的每个窗口的状态。
另一方面, 的目的KeyedStateStore globalState()
是记住从一个窗口到另一个窗口的东西,所以你不会清除它。然而,如果你有一个无界的键空间,你应该注意清理过时键的全局窗口状态。唯一的方法是在全局状态的状态描述符上指定状态 TTL 。
推荐阅读
- mysql - left join 和 count 在 MYSQL 中无法正常工作
- python - Python 3 按键检测
- r - R:创建一个直方图,显示每个大陆的贸易余额分布
- python - 如何有效地将数组解码为熊猫数据框中的列
- python - 一个热门编码器是行业规范,在训练/拆分之前或之后进行编码
- java - API 服务中的数据不更新,仅在清除缓存后更新
- mongoose - 如何防止猫鼬创建集合?
- .htaccess - 将单页限制为用户代理?
- javascript - 如果文本框 2 中有相同的多个值,则 jQuery 验证文本框 1
- node.js - NodeJS 中巨大的 JSON 对象处理