apache-flink - 过滤apache flink中的唯一事件
问题描述
我在一个 java 类中定义了某些变量,并使用不同的类访问它,以便过滤流中的唯一元素。请参考代码以更好地理解问题。
我面临的问题是这个过滤器功能不能很好地过滤独特的事件。我怀疑变量在不同线程之间共享,这是原因!?如果这不是正确的方法,请建议另一种方法。提前致谢。
**ClassWithVariables.java**
public static HashMap<String, ArrayList<String>> uniqueMap = new HashMap<>();
**FilterClass.java**
public boolean filter(String val) throws Exception {
if(ClassWithVariables.uniqueMap.containsKey(key)) {
Arraylist<String> al = uniqueMap.get(key);
if(al.contains(val) {
return false;
} else {
//Update the hashmap list(uniqueMap)
return true;
}
} else {
//Add to hashmap list(uniqueMap)
return true;
}
}
解决方案
对流进行去重的正确方法是通过 key 对流进行分区,这样所有包含相同 key 的元素都将由同一个 worker 处理,并使用 flink 的托管、keyed state 机制,从而使 state 具有容错性和可重新扩展。这是一个示例实现:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicate())
.print();
env.execute();
}
public static class Deduplicate extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> seen;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
seen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (seen.value() == null) {
out.collect(event);
seen.update(true);
}
}
}
这也可以实现为 RichFilterFunction,顺便说一句。但请注意,如果您有一个无界的键空间,则使用的状态将无限增长,直到您用完堆或磁盘空间,这取决于您选择的 Flink 的状态后端。如果这是一个问题,您可能希望通过State Time-to-Live设置状态保留策略。
另请注意,在 Flink 管道的不同部分之间共享状态是不可能的。与看起来正常的情况相比,您需要将事情从里到外翻转,并将事件流带入状态,而不是获取它。
推荐阅读
- windows - 如何用另一个 bat 文件或程序关闭 cmd 窗口打开的 bat 文件
- powerbi - 将 Azure Blob 容器连接到 Power BI 服务
- java - 请帮助我了解静态对象
- reactjs - 在 Gatsby / React 中将文件作为字符串(或源资产)导入
- java - GCP 签名的 url 将内容处置设置为内联
- powershell - 使用 Excel 定义 ARM 参数变量
- python - 如何减少 selenium python 中的 chromedriver cpu 使用率?
- javascript - Vue 的 Mutations 和 Actions 不起作用
- javascript - (已解决)二进制/十六进制(或任何其他基数)的行为类似于输入类型=数字
- php - PHP:成员函数无法访问成员变量