首页 > 解决方案 > Apache Flink:两个(或多个)任务管理器之间的共享状态

问题描述

假设我有两个任务管理器,每个都只有一个任务槽。现在,我有以下工作:

    KeyedStream<...> streamA = env.addSource(...).keyBy(...);
    KeyedStream<...> streamB = env.addSource(...).keyBy(...);

    streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);

一个任务管理器将使用来自 Kafka 主题的数据,另一个将使用来自另一个 Kafka 主题的数据。

我将作业发送给作业管理器以执行它。Flink 分配两个任务管理器来处理 flatMap(因为一个任务管理器只有一个任务槽)。

flatMap 在事件之间进行简单的连接(使用两个键控状态):

    public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
        private ValueState<A> AState;
        private ValueState<B> BState;

        @Override
        public void open(Configuration config) {
            AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
            BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
        }

        @Override
        public void flatMap1(A event, Collector<String> out) throws Exception {
            B secondEvent = BState.value();

            if (secondEvent == null)
                AState.update(event);
            else {
                out.collect(...);
                BState.clear();
            }
        }

        @Override
        public void flatMap2(A event, Collector<String> out) throws Exception {
            A firstEvent = AState.value();

            if (firstEvent == null)
                BState.update(event);
            else {
                out.collect(...);
                AState.clear();
            }
        }
    }

如果我理解正确的话,在 connect 方法之后,流就变成了只有一个。现在,实现的 flatMap 需要共享状态,因为操作员必须控制相关事件是否到达以应用连接,但它以两个并行执行,因此使用两个任务管理器。这意味着每次必须更新状态时,任务管理器都应该保存在另一个任务管理器的状态中(在连接方法之后共享),或者它可能需要简单地读取状态。那么任务管理器如何通信呢?是否会影响性能,因为任务管理器可能在不同的集群节点上运行?

编辑:我在 Flink 的博客上找到了以下文章,似乎两个任务管理器可以通过 TCP 连接进行通信,这对我来说很有意义,因为在某些情况下我们需要在事件之间共享状态。如果这是错误的,您能否向我解释一下Flink 如何管理以下场景?

假设总是有两个任务管理器,物理上位于两个集群节点上。每个任务管理器始终只有一个插槽。我运行上述作业并将并行度设置为 2(例如,使用-p将作业发送到作业管理器时的参数)。现在,Flink 将从我的作业中创建两个结构相同的子任务,并将它们发送到任务管理器。两个任务管理器都将执行“相同”的作业,但使用不同的事件。该作业消耗来自两个 Kafka 主题的事件:A 和 B。这意味着第一个和第二个任务管理器将同时消耗来自主题 A 和 B 的事件,但事件不同,否则会有重复。作业是相同的,即它执行上面的 RichCoFlatMapFunction,然后每个任务管理器将在本地处理其消费事件集和个人本地状态。现在问题来了:假设第一个任务管理器已经消费了一个键为“1”的事件。此事件到达 RichCoFlatMapFunction 内部,并存储在状态中,因为操作员仍在等待另一个具有相同键的事件来产生连接。如果另一个具有“1”键的事件从第二个任务管理器中被消耗,并且它们不共享它们的状态或通信,则不可能进行加入。我的推理有什么问题?

标签: joinstateapache-flink

解决方案


两个任务管理器不需要为了状态共享而进行通信——在 Flink 中没有状态共享。

下面显示的这三个执行图中的任何一个都是可能的,具体取决于您如何安排源的详细信息。在每个图的左侧,我们看到 A 和 B 的源运算符,在右侧,两个输入运算符的两个并行实例通过 RichCoFlatMap 实现连接。

在此处输入图像描述

keyBy 不是运算符,而是指定源和两个 RichCoFlatMap 实例如何连接。它安排这是一个散列连接,对源流进行重新分区。

使用这三种场景中的哪一种并不重要,因为在所有三种情况下,keyBy 将具有相同的效果,即会将某些键的所有事件引导至 Join1,并将其他键的所有事件引导至 Join2。

换句话说,对于任何给定的键,该键的所有事件都将在同一个任务槽中处理。您可以将其ValueState<A>视为分布式(分片)键/值存储,其中值的类型为 A。每个任务管理器都有该键/值存储切片的状态(用于键的不相交子集),并处理这些键的所有事件(并且只有那些键)。

例如:在flatMap1中,当BState.value()使用 from 的元素调用时streamA,Flink 运行时将访问BState 当前在 context 中streamA的键的值,这意味着与当前正在处理的事件的键关联的值。在当前任务中,此状态将始终是本地的。同样,flatMap2将始终使用来自 的元素调用streamB

这种设计避免了任务管理器之间的任何耦合,有利于可扩展性和性能。


推荐阅读