apache-flink - 在 flink 内部 contextService 是如何工作的?
问题描述
目前,我正在将 flink 与 kafka 消费者一起使用。我从 kafka 消费者流中监听任务事件更改(任务创建事件、任务更新事件、任务关闭事件)。我有一个用例,我需要检查在任务创建后135 分钟内没有发生任务更新。对于这个用例,我使用contextService.setTimer(....)。现在,我对可以设置的定时器数量的限制感到困惑。即如果创建了100000个新任务,那么,100000个计时器将设置为不同的 - 不同的时间戳。
可行吗?
这是解决我的用例的最佳方法吗?
所以基本上我想知道。
- contextService 在内部是如何工作的?
- 可以设置多少个定时器(定时器数)?
代码
private static class MatchFunction extends KeyedProcessFunction<String, Tuple5<String, String, String, String, Long>, Object> {
private ValueState<Tuple5<String, String, String, String, Long>> taskState;
public MatchFunction() {}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple5<String, String, String, String, Long>> stateDescriptor =
new ValueStateDescriptor<>("SLA Breach task event", TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class, String.class, Long.class));
taskState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(Tuple5<String, String, String, String, Long> event, Context context, Collector<Object> out) throws Exception {
Tuple5<String, String, String, String, Long> previousEvent = taskState.value();
String taskStatus = event.f1;
String taskId = event.f0;
long createdAt = event.f4;
if(previousEvent != null && previousEvent.f1.equalsIgnoreCase(taskStatus)) {
// might be duplicate event
return;
}
if (previousEvent == null) {
if (isNewEvent(event)) {
taskState.update(event);
long scheduleTime = Utils.getTimerTime(createdAt, Time.minutes(INT_135));
context.timerService().registerEventTimeTimer(scheduleTime);
logger.info("Adding event -> {} in taskState, Schedule time -> {}", event, new Date(scheduleTime));
}
} else {
if (isUpdatedEvent(event)) {
// it's an UPDATE event, so event saved was the START event and has a timer
// the timer hasn't fired yet, and we can safely kill the timer
context.timerService().deleteEventTimeTimer(Utils.getTimerTime(createdAt, Time.minutes(INT_135)));
logger.info("Cancelled timer as Updation event was received for event - {} ", event);
// Update events have now been seen, we can clear the state
taskState.clear();
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Object> out) throws Exception {
// no UPDATE event is received from past 135 minutes.
Tuple5<String, String, String, String, Long> event = taskState.value();
String taskId = event.f0;
long createdAt = event.f4;
// do something here after timer is received...
}
}
}
}
解决方案
关于 Flink 定时器的几个关键事实:
- 定时器总是键控的。
- 计时器很便宜:您可以拥有数百万或数十亿个计时器。
- 定时器既可以存在于堆上,也可以存在于 RocksDB 中。
- 定时器有检查点,失败后会恢复。
- 计时器在(键,时间戳)级别进行重复数据删除。也就是说,对于一个特定的key和timestamp,最多只有一个timer。有时,您可以通过利用这种重复数据删除来合并计时器来减少创建的计时器数量。
推荐阅读
- r - 如何在嵌套函数中使用包“purrr”中的“map”?
- java - 如何继续测试使用 jMockIt 接收“类”参数的函数?
- regex - 尽管似乎可以在在线测试仪上工作,但 Perl 正则表达式没有被替换
- python - 在不分享源代码的情况下展示python项目
- mysql - 比较多个记录中的 2 个日期
- python - 使用 Python 在块中下载文件时如何获取 ETA?
- java - Java DOM XML 编辑 - 添加和删除元素非常慢
- ajax - 为什么 ajax 调用只显示 URL 中的数据?
- linux - 如何并行而不是顺序运行 bash 脚本?
- angular - 拆分 observable 中的每个单词(去抖直到空格?)