首页 > 解决方案 > 在 flink 内部 contextService 是如何工作的?

问题描述

目前,我正在将 flink 与 kafka 消费者一起使用。我从 kafka 消费者流中监听任务事件更改(任务创建事件、任务更新事件、任务关闭事件)。我有一个用例,我需要检查在任务创建后135 分钟内没有发生任务更新。对于这个用例,我使用contextService.setTimer(....)。现在,我对可以设置的定时器数量的限制感到困惑。即如果创建了100000个新任务,那么,100000个计时器将设置为不同的 - 不同的时间戳。

可行吗?

这是解决我的用例的最佳方法吗?

所以基本上我想知道。

代码

   private static class MatchFunction extends KeyedProcessFunction<String, Tuple5<String, String, String, String, Long>, Object> {
        private ValueState<Tuple5<String, String, String, String, Long>> taskState;

        public MatchFunction() {}

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Tuple5<String, String, String, String, Long>> stateDescriptor =
                    new ValueStateDescriptor<>("SLA Breach task event", TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class, String.class, Long.class));
            taskState = getRuntimeContext().getState(stateDescriptor);
        }

        @Override
        public void processElement(Tuple5<String, String, String, String, Long> event, Context context, Collector<Object> out) throws Exception {
            Tuple5<String, String, String, String, Long> previousEvent = taskState.value();
            String taskStatus = event.f1;
            String taskId = event.f0;
            long createdAt = event.f4;

            if(previousEvent != null && previousEvent.f1.equalsIgnoreCase(taskStatus)) {
                // might be duplicate event
                return;
            }

            if (previousEvent == null) {
                if (isNewEvent(event)) {
                    taskState.update(event);
                    long scheduleTime = Utils.getTimerTime(createdAt, Time.minutes(INT_135));
                    context.timerService().registerEventTimeTimer(scheduleTime);
                    logger.info("Adding event -> {} in taskState, Schedule time -> {}", event, new Date(scheduleTime));
                }
            } else {
                if (isUpdatedEvent(event)) {
                    // it's an UPDATE event, so event saved was the START event and has a timer
                    // the timer hasn't fired yet, and we can safely kill the timer

                

                    context.timerService().deleteEventTimeTimer(Utils.getTimerTime(createdAt, Time.minutes(INT_135)));
               
                    logger.info("Cancelled timer as Updation event was received for event - {} ", event);
                    // Update events have now been seen, we can clear the state
                    taskState.clear();
                }
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext context, Collector<Object> out) throws Exception {
            // no UPDATE event is received from past 135 minutes.
            

            Tuple5<String, String, String, String, Long> event = taskState.value();
            String taskId = event.f0;
           
            long createdAt = event.f4;
                // do something here after timer is received...
             
                }
            }
        }
    }

标签: apache-flinkflink-streaming

解决方案


关于 Flink 定时器的几个关键事实:

  • 定时器总是键控的。
  • 计时器很便宜:您可以拥有数百万或数十亿个计时器。
  • 定时器既可以存在于堆上,也可以存在于 RocksDB 中。
  • 定时器有检查点,失败后会恢复。
  • 计时器在(键,时间戳)级别进行重复数据删除。也就是说,对于一个特定的key和timestamp,最多只有一个timer。有时,您可以通过利用这种重复数据删除来合并计时器来减少创建的计时器数量。

推荐阅读