java - Apache Flink 过滤功能
问题描述
我想在 Apache Flink 中实现自定义过滤器功能,但我不知道如何在不硬连接的情况下将过滤条件列表注入其中。
假设我的函数看起来像这样
public class CustomFilter implements FilterFunction{
@Override
public boolean filter(Object o) throws Exception{
String[] values = {"First","Second","Last"}; <-- How can i pass this Array or Collection to my Filter function?
for(String s: values){
if(!o.toString().contains(s)) return false;
}
return true;
}
}
流式传输作业将如下所示:
public class StreamingJob{
...
env
.fromElements("Data","New Data","First")
.filter(new CustomFilter())
.print
.execute();
}
当我尝试向类中的 CustomFilter 函数参数添加某种集合时,例如
public boolean filter(String s, Collection<String> searchValues){
...
}
我收到消息,该函数必须仅来自 String 类型,因为它是一个已实现的函数。
解决方案
正如其他人所指出的,只需保存您通过构造函数传入的目标值列表,并在filter()
方法中使用它们。
public class CustomFilter implements FilterFunction<Object> {
private String[] targetValues;
public CustomFilter(String[] targetValues) {
this.targetValues = targetValues;
}
}
推荐阅读
- amazon-web-services - 在 Athena AWS 中重命名列名
- solr - SOLR 复制计数器不断重置
- php - 如何在搜索时修复 web.php 中的路由而不会出现 404 错误?
- python - 在 AJAX 调用中传递 JSON 对象而不是数组 ->DJANGO
- android - 视图后面的访问表面
- python - ExecuteError:错误 000561:所选图层的关系无效。执行失败(SelectLayerByLocation)
- tizen - SAP:Android 提供者可以初始化与 Tizen 消费者的连接吗?
- python - 如何增加微调器中下拉列表的宽度?
- javascript - 如何将数据添加到将动态创建的 redux store 数组中?
- xquery - 代码点不合法 - SVCCODEPOINT