首页 > 解决方案 > Apache Flink:为 DataStream API 添加侧输入

问题描述

在我的Java应用程序中,我有三个 DataStreams。例如,一个流数据是从 Kafka 消费的,另一个流数据是从 Apache Nifi 消费的。对于这两个流的对象类型是不同的。例如,Stream-1 对象类型为 Person,Stream-2 对象类型为 Address。

第三个是广播流(因为这个数据是从 Kafka 消费的)。

现在我想将 Stream-1 和 Stream-2 组合在一个 Job 类中,并希望在任务流程元素中进行拆分。如何实施?

注意: Stream-1 是主流,Stream-2 是侧输入。MainStream 不断从 Kafka 获取数据。对于 Side Input,最初当应用程序启动时,所有表数据都从 DB 加载,然后在表数据更新时(不频繁)读取新数据。

样本结构:

DataStream<Person> stream-1 = env.addSource(read data from kafka)....
DataStream<Address> stream-2 = env.addSource(read data from nifi)....
BroadcastStream<String> BroadCastStream = stream-3.broadcast(read data from kafka);

我被称为以下链接。

DataStream API 的 FLIP-17 侧输入

jira/浏览/FLINK-6131

我的用例是:

使用缓慢演变的数据加入流:我们用于丰富的侧输入随着时间的推移而演变(数据从数据库中读取)。这可以通过在处理主输入之前等待一些初始数据可用并在新数据到达时不断地将新数据摄取到内部输入结构中来完成。

标签: javaapache-flinkapache-nififlink-streamingflink-batch

解决方案


根据最新的回复,@Arvid 的推荐实际上正是这里所需要的。

答案的核心:

即使它们具有不同的类型,您也可以轻松地加入 stream1 和 stream2。然后您可以将广播添加到结果中

文档示例的链接,以及文档中的相关片段(示例太长,无法包含在此处):

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

推荐阅读