首页 > 解决方案 > 过滤apache flink中的唯一事件

问题描述

我在一个 java 类中定义了某些变量,并使用不同的类访问它,以便过滤流中的唯一元素。请参考代码以更好地理解问题。

我面临的问题是这个过滤器功能不能很好地过滤独特的事件。我怀疑变量在不同线程之间共享,这是原因!?如果这不是正确的方法,请建议另一种方法。提前致谢。

**ClassWithVariables.java**
public static HashMap<String, ArrayList<String>> uniqueMap = new HashMap<>();


**FilterClass.java**
public boolean filter(String val) throws Exception {

       if(ClassWithVariables.uniqueMap.containsKey(key)) {

                Arraylist<String> al = uniqueMap.get(key);

                if(al.contains(val) {
                    return false;
                } else {
                    //Update the hashmap list(uniqueMap)                    
                    return true;    
                }


       } else {

               //Add to hashmap list(uniqueMap)
               return true;
       }

}

标签: apache-flinkflink-streaming

解决方案


对流进行去重的正确方法是通过 key 对流进行分区,这样所有包含相同 key 的元素都将由同一个 worker 处理,并使用 flink 的托管、keyed state 机制,从而使 state 具有容错性和可重新扩展。这是一个示例实现:

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  env.addSource(new EventSource())
    .keyBy(e -> e.key)
    .flatMap(new Deduplicate())
    .print();

  env.execute();
}

public static class Deduplicate extends RichFlatMapFunction<Event, Event> {
  ValueState<Boolean> seen;

  @Override
  public void open(Configuration conf) {
    ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
    seen = getRuntimeContext().getState(desc);
  }

  @Override
  public void flatMap(Event event, Collector<Event> out) throws Exception {
    if (seen.value() == null) {
      out.collect(event);
      seen.update(true);
    }
  }
}

这也可以实现为 RichFilterFunction,顺便说一句。但请注意,如果您有一个无界的键空间,则使用的状态将无限增长,直到您用完堆或磁盘空间,这取决于您选择的 Flink 的状态后端。如果这是一个问题,您可能希望通过State Time-to-Live设置状态保留策略。

另请注意,在 Flink 管道的不同部分之间共享状态是不可能的。与看起来正常的情况相比,您需要将事情从里到外翻转,并将事件流带入状态,而不是获取它。


推荐阅读