首页 > 解决方案 > 一个对象 flink 运算符(例如过滤器)或 Apache Flink Job 中的两个对象

问题描述

我有来自不同 Apache Kafka 主题的 4 个输入 DataStreams(JSON 消息)的 Apache Flink 作业,而且我只有一个对象 XFilterFunction - 它进行一些过滤。我写了一些数据管道逻辑(例如原始示例):

FilterFunction<MyEvent> xFilter = new XFilterFunction();

inputDataStream1.filter(xFilter)
.name("Xfilter")
.uid("Xfilter");

inputDataStream2
.union(inputDataStream3)
//here some logics (map, process,...)
.filter(xFilter);

在 Job 中使用一个新对象 XFilterFunction 是好是坏?
还是更好地使用两个新对象 XFilterFunction?(2 个流 -> 2 个新的过滤器对象)

标签: apache-flinkflink-streamingflink-cep

解决方案


如果您多次实例化该类,即

inputDataStream1.filter(new XFilterFunction());
...
inputDataStream2.filter(new XFilterFunction());

应该没有问题。我不太确定状态或覆盖的上下文函数之类的东西是否会显示出不需要的行为。如果它不是专门化的RichFunction,也许甚至只是通过委托发生的纯函数调用,不幸的是我对 Flink 的内部没有那么深入,但是有了上面的解决方案,你应该是安全的。


推荐阅读