java - 如何同时处理具有不同数据类型的 DataStream
问题描述
假设我有一个处理 aDataStream<X>
并将返回值发送到 DB 的函数,但是我需要从另一个源读取,并且在处理这个新的 DataStream 时,我需要找到在将其存储DataStream<X>
到 DB 之前可以生成的状态和找到它进入的一个 ID,DataStream<Y>
然后触发一个动作。
我的问题是:
Co-ProcessFunction
是否可以通过在 Flink 中使用例如 a来处理转换的结果DataStream<X>
并在那里创建状态,同时处理DataStream<Y>
在同一个运算符中具有状态和新流?
如果第一个问题完全错误,这是可能的,有什么办法可以做我需要做的吗?
希望有人能理解我需要做什么。
解决方案
是的,可以连接两个不同类型的流,并使用共享状态一起处理它们。
为了与 连接Stream<X>
并Stream<Y>
让它们共享状态,您必须定义键选择器函数,该函数返回两个流的等效键。(就像在 SQL 中一样,为了连接两个表,您必须描述如何连接它们。)
在这个伪代码中,anotherFlinkFunction
是一个RichCoFlatMapFunction
. 我假设id
当流 X 和流 Y 中的项目应该组合时,两个流都有一个具有相同值的字段。
x = env.addSource(...);
xTransformed = x.flatMap(...);
xTransformed.addSink(DB);
y = env.addSource(...);
z = xTransformed
.connect(y)
.keyBy(xt->xt.id, y->y.id)
.flatMap(new anotherFlinkFunction());
z.addSink(...);
您可以在https://ci.apache.org/projects/flink/flink-docs-stable/learn-flink/etl.html#example的 Apache Flink 培训教程和 https 的随附练习中找到相关示例: //github.com/apache/flink-training/tree/master/rides-and-fares。
推荐阅读
- powershell - Jenkins to Powershell - 我需要在传递的参数中转义美元符号吗?
- javascript - 试图在 React 中隐藏 Calendar 的选择输入和页脚元素
- ruby - 使用 Raven.send_event 发送附加信息
- vue.js - Vue Test Utils - 跳过创建的钩子
- c++ - 将 Scala 函数移植到 C++
- msal - 如果用户从 msal .net 代码登录,则无法使用 msal.js 静默获取访问令牌
- hyperledger-fabric - Hyperledger Fabric 背书失败:txid 存在
- python - 我有一个包含字符串和整数的数据集,如何编写一个只读取 Python 上的整数值的程序?
- javascript - “请求 vscode/内容意外失败,未提供任何详细信息。”
- php - get_the_category 生成错误:Call to undefined function __() in taxonomy.php