首页 > 解决方案 > Apache Flink 过滤功能

问题描述

我想在 Apache Flink 中实现自定义过滤器功能,但我不知道如何在不硬连接的情况下将过滤条件列表注入其中。

假设我的函数看起来像这样

public class CustomFilter implements FilterFunction{

  @Override
  public boolean filter(Object o) throws Exception{
  String[] values = {"First","Second","Last"}; <-- How can i pass this Array or Collection to my Filter function?
     for(String s: values){
        if(!o.toString().contains(s)) return false;
     }
  return true;
  }
}

流式传输作业将如下所示:

public class StreamingJob{
...
env 
    .fromElements("Data","New Data","First")
    .filter(new CustomFilter())
    .print
    .execute();
}

当我尝试向类中的 CustomFilter 函数参数添加某种集合时,例如

public boolean filter(String s, Collection<String> searchValues){
    ...
}

我收到消息,该函数必须仅来自 String 类型,因为它是一个已实现的函数。

标签: javaapache-flinkflink-streaming

解决方案


正如其他人所指出的,只需保存您通过构造函数传入的目标值列表,并在filter()方法中使用它们。

public class CustomFilter implements FilterFunction<Object> {

    private String[] targetValues;

    public CustomFilter(String[] targetValues) {
        this.targetValues = targetValues;
    }


}

推荐阅读