join - Apache Flink:两个(或多个)任务管理器之间的共享状态
问题描述
假设我有两个任务管理器,每个都只有一个任务槽。现在,我有以下工作:
KeyedStream<...> streamA = env.addSource(...).keyBy(...);
KeyedStream<...> streamB = env.addSource(...).keyBy(...);
streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);
一个任务管理器将使用来自 Kafka 主题的数据,另一个将使用来自另一个 Kafka 主题的数据。
我将作业发送给作业管理器以执行它。Flink 分配两个任务管理器来处理 flatMap(因为一个任务管理器只有一个任务槽)。
flatMap 在事件之间进行简单的连接(使用两个键控状态):
public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
private ValueState<A> AState;
private ValueState<B> BState;
@Override
public void open(Configuration config) {
AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
}
@Override
public void flatMap1(A event, Collector<String> out) throws Exception {
B secondEvent = BState.value();
if (secondEvent == null)
AState.update(event);
else {
out.collect(...);
BState.clear();
}
}
@Override
public void flatMap2(A event, Collector<String> out) throws Exception {
A firstEvent = AState.value();
if (firstEvent == null)
BState.update(event);
else {
out.collect(...);
AState.clear();
}
}
}
如果我理解正确的话,在 connect 方法之后,流就变成了只有一个。现在,实现的 flatMap 需要共享状态,因为操作员必须控制相关事件是否到达以应用连接,但它以两个并行执行,因此使用两个任务管理器。这意味着每次必须更新状态时,任务管理器都应该保存在另一个任务管理器的状态中(在连接方法之后共享),或者它可能需要简单地读取状态。那么任务管理器如何通信呢?是否会影响性能,因为任务管理器可能在不同的集群节点上运行?
编辑:我在 Flink 的博客上找到了以下文章,似乎两个任务管理器可以通过 TCP 连接进行通信,这对我来说很有意义,因为在某些情况下我们需要在事件之间共享状态。如果这是错误的,您能否向我解释一下Flink 如何管理以下场景?
假设总是有两个任务管理器,物理上位于两个集群节点上。每个任务管理器始终只有一个插槽。我运行上述作业并将并行度设置为 2(例如,使用-p将作业发送到作业管理器时的参数)。现在,Flink 将从我的作业中创建两个结构相同的子任务,并将它们发送到任务管理器。两个任务管理器都将执行“相同”的作业,但使用不同的事件。该作业消耗来自两个 Kafka 主题的事件:A 和 B。这意味着第一个和第二个任务管理器将同时消耗来自主题 A 和 B 的事件,但事件不同,否则会有重复。作业是相同的,即它执行上面的 RichCoFlatMapFunction,然后每个任务管理器将在本地处理其消费事件集和个人本地状态。现在问题来了:假设第一个任务管理器已经消费了一个键为“1”的事件。此事件到达 RichCoFlatMapFunction 内部,并存储在状态中,因为操作员仍在等待另一个具有相同键的事件来产生连接。如果另一个具有“1”键的事件从第二个任务管理器中被消耗,并且它们不共享它们的状态或通信,则不可能进行加入。我的推理有什么问题?
解决方案
两个任务管理器不需要为了状态共享而进行通信——在 Flink 中没有状态共享。
下面显示的这三个执行图中的任何一个都是可能的,具体取决于您如何安排源的详细信息。在每个图的左侧,我们看到 A 和 B 的源运算符,在右侧,两个输入运算符的两个并行实例通过 RichCoFlatMap 实现连接。
keyBy 不是运算符,而是指定源和两个 RichCoFlatMap 实例如何连接。它安排这是一个散列连接,对源流进行重新分区。
使用这三种场景中的哪一种并不重要,因为在所有三种情况下,keyBy 将具有相同的效果,即会将某些键的所有事件引导至 Join1,并将其他键的所有事件引导至 Join2。
换句话说,对于任何给定的键,该键的所有事件都将在同一个任务槽中处理。您可以将其ValueState<A>
视为分布式(分片)键/值存储,其中值的类型为 A。每个任务管理器都有该键/值存储切片的状态(用于键的不相交子集),并处理这些键的所有事件(并且只有那些键)。
例如:在flatMap1
中,当BState.value()
使用 from 的元素调用时streamA
,Flink 运行时将访问BState
当前在 context 中streamA
的键的值,这意味着与当前正在处理的事件的键关联的值。在当前任务中,此状态将始终是本地的。同样,flatMap2
将始终使用来自 的元素调用streamB
。
这种设计避免了任务管理器之间的任何耦合,有利于可扩展性和性能。
推荐阅读
- xamarin.ios - 更改所有应用程序 Xamarin iOS 的 UIView 大小
- php - 从 Woocommerce 产品页面中删除指向单个页面的链接
- apache - 来自 Nginx 访问日志的数据流 -> Rsyslog 或 Syslog -> Fluentd -> Kinesis
- django - 在 Django Rest 中覆盖 get 方法时出错
- html - 图像左侧有额外的空白区域。任何事情都有帮助
- gradle - 升级到 gradle-5 后,war 文件很大
- casting - 将 int 转换为 xquery 中对应的 ascii
- javascript - MSEdge 浏览器中 canvas.toBlob() 的兼容性
- reactjs - 如何在 ag 网格自定义单元格编辑器中按下箭头键和输入键时做出 event.stopPropagation() 以反应选择输入字段
- c++ - 动态数组 (GCC) 和指针之间的区别