apache-flink - 如何合并多个 Keyed Streams,然后在 Merged Stream 上执行自定义聚合功能
问题描述
我的问题的背景如下:
我有一些输入流:DataStream <String> input_stream= . . .
首先,我执行flatMap操作,以便将输入流填充/复制到具有 3 个不同标识符的 3 个相同实例。
structured_stream = input_stream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>
结果是Tuple2<String, Integer>
细绳 | 整数 |
---|---|
输入流 | 1 |
输入流 | 2 |
输入流 | 3 |
因此,中间结果是不同标识符旁边的输入流
然后通过对第二个属性的keyBy操作...
重要的是要澄清,我们从DataStream环境转到KeyedStream环境
KeyedStream<Tuple2<Integer, Integer>> partial_result = structured_stream.keyBy(1).flatMap(new StatefulMap());
我已经实现了一个StatefulMap函数,它本质上是每个键的状态。
static class StatefulMap extends RichFlatMapFunction<Tuple2<String, Integer>,...> {}
每个状态都给我一个输出(通过收集器),其中包含关于给定 input_stream 的答案。
所以,我有三个部分结果。
整数 | 细绳 | 整数(状态标识符) |
---|---|---|
答案 1 | 输入流 | 状态 1 |
答案 2 | 输入流 | 状态 2 |
答案 3 | 输入流 | 状态 3 |
我的问题是:
有什么方法可以从 3 个不同的 KeyedStreams 中收集所有这些部分结果并制作一个通用的自定义聚合/累加器,例如 WeightedVoting(提取一个最终结果)?(不是基于key,而是基于input_stream)
换句话说,有没有办法在 Apache Flink 中合并多个 KeyedStreams 并执行单个聚合函数?
PS我当前的解决方案是在同一个Apache Kafka接收器中编写每个KeyedStream,然后运行第二个作业,根据输入DataStream执行我的自定义聚合功能。本质上,我将 KeyedStreams 转换为一个统一的 DataStream但我添加了第二项工作,即开销。
对于那些想知道我正在 Apache Flink 中实现集成学习的人。
解决方案
推荐阅读
- jquery - Opencart 3 网格系统 Undestand 类别网格视图
- ios - 尽管 Firebase 深层链接成功打开应用程序,但未调用“继续 userActivity”方法
- ios - 使用firebase聊天时如何从下到上设置数据
- outlook - Outlook 插件文档:位置和电子邮件用户的格式
- swift - 如何在swiftUI中忽略具有线性渐变的背景的安全区域?
- primefaces - 惰性搜索在 Primefaces 7.0 中不起作用
- hybris - Hybris 目录 cronjob 同步不起作用
- java - 实现时如何修复文本更改侦听器错误?
- python-3.x - 在 Jupyter Notebook 中隐藏代码单元,使用 Papermill 执行,使用 nbconvert 转换为 PDF
- pyserial - 使用 pyserial 从 COM 端口读取数据,但输出为空