首页 > 解决方案 > Flink - 键控过程功能中的Java类成员

问题描述

我有以下 flink keyedprocessfunction。我基本上是在尝试实现状态设计模式。

public AlertProcessor extends KeyedProcessFunction<Tuple2<String, String>, Event1, Event2> {

   private transient AlertState currentState;
   private transient AlertState activeAlertState;
   private transient AlertState noActiveAlertState;
   private transient AlertState resolvedAlertState;

   @Override
   public void open(Configuration parameters) {
      activeAlertState = new ActiveAlertState();
      noActiveAlertState = new NoActiveAlertState();
      resolvedAlertState = new ResolvedAlertState();
   }


   @Override
   public processElement(Event1 event1, Context ctx, Collector<Event2> out) throws Exception {

        // Would the below if condition work for multiple keys?
        if (currentAlertState == null) {
            currentAlertState = noActiveAlertState;
        }

        currentAlertState.handle(event1, out);
   }

   private interface AlertState {
        void handle(Event1 event1, Collector<Event2> out);
   } 

   private class ActiveAlertState implements AlertState {
       void handle(Event1 event1, Collector<Event2> out) {
           logger.debug("Moving to no alertState");
           
           // Do something and push some Event2 to out
           currentAlertState = resolvedActiveAlertState;
       }
   }
 
   private class NoActiveAlertState implements AlertState {
       void handle(Event1 event1, Collector<Event2> out) {
           logger.debug("Moving to no alertState");
            
           // Do something and push some Event2 to out
           currentAlertState = activeAlertState;
       }
   }

   private class ResolvedAlertState implements AlertState {
       
       void handle(Event1 event1, Collector<Event2> out) {
           logger.debug("Moving to no alertState");
           
           // Do something and push some Event2 to out
           currentAlertState = noActiveAlertState;
       }
   }

}

我的问题是——

  1. 流中的每个键是否会有一个 AlertProcessor 实例(或对象)?换句话说, currentAlertState 对象是否每个键都是唯一的?或者这个 AlertProcessor 操作符的每个实例都会有一个 currentAlertState?

如果 currentAlertState 是运算符的每个实例,那么此代码将不会真正起作用,因为 currentAlertState 将被不同的键覆盖。我的理解正确吗?

  1. 我可以将 currentAlertState 存储为键控状态,并为每个 processElement() 调用初始化它。如果我这样做,我不需要在 handle() 实现中将 currentAlertState 分配或设置为下一个状态,因为 currentAlertState 无论如何都会根据 flink 状态进行初始化。

  2. 有没有更好的方法在 flink 中实现状态设计模式并且仍然减少创建的状态对象的数量?

标签: apache-flinkflink-streaming

解决方案


将在管道的每个并行实例(每个任务槽)中创建一个AlertProcessor实例,并将在该槽处理的所有键上多路复用。

如果 currentAlertState 是运算符的每个实例,那么此代码将不会真正起作用,因为 currentAlertState 将被不同的键覆盖。我的理解正确吗?

正确的。您应该对 使用键控状态currentAlertState,这将导致在状态后端中为每个不同的键创建一个条目。


推荐阅读