首页 > 解决方案 > Apache Flink - ConnectedStream 顺序和背压

问题描述

下面的相同代码显示了两个源函数 - 一个产生 0-20 的偶数,另一个产生 1-20 的奇数,连接在一起以输出所有两个流的并集并将它们打印出来。

示例代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env 
    .addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=0; i < 20; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceA: " + i);
                ctx.collect(i);
                Thread.sleep(1000);
            }               
        }

        @Override
        public void cancel() {}

    })
    .connect(env.addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=1; i < 20; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceB: " + i);
                ctx.collect(i);
                Thread.sleep(1000);
            }               
        }

        @Override
        public void cancel() {}

    })).process(new CoProcessFunction<Integer, Integer, Integer>(){

        private static final long serialVersionUID = 1L;

        @Override
        public void processElement1(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorA: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);
        }

        @Override
        public void processElement2(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorB: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);

        }

    })      
    .addSink(new SinkFunction<Integer>() {
        private static final long serialVersionUID = 1L;

        @Override
        public void invoke(Integer value) throws Exception {
            System.out.println(System.currentTimeMillis()+":Sink: " + value);
        }

    });

    env.execute();

输出

1578465207355:SourceB: 1
1578465207379:SourceA: 0
1578465207437:OperatorA: 0
1578465208360:SourceB: 3
1578465208380:SourceA: 2
1578465209364:SourceB: 5
1578465209383:SourceA: 4
1578465210366:SourceB: 7
1578465210386:SourceA: 6
1578465211369:SourceB: 9
1578465211390:SourceA: 8
1578465212370:SourceB: 11
1578465212394:SourceA: 10
1578465212440:Sink: 0
1578465212441:OperatorB: 1
1578465213375:SourceB: 13
1578465213399:SourceA: 12
1578465214379:SourceB: 15
1578465214401:SourceA: 14
1578465215383:SourceB: 17
1578465215406:SourceA: 16
1578465216388:SourceB: 19
1578465216409:SourceA: 18
1578465217441:Sink: 1
1578465217441:OperatorB: 3
1578465222446:Sink: 3
1578465222446:OperatorB: 5
1578465227448:Sink: 5
1578465227449:OperatorB: 7
1578465232452:Sink: 7
1578465232453:OperatorB: 9
1578465237453:Sink: 9
1578465237453:OperatorB: 11
1578465242456:Sink: 11
1578465242456:OperatorA: 2
1578465247462:Sink: 2
1578465247462:OperatorA: 4
1578465252467:Sink: 4
1578465252467:OperatorA: 6

Q1。

Flink 应该将任何一个连接的流中最先到达的项目发送到 CoProcessFunction。然而,我们在这里看到的是数字“2”是在数字“11”之前由源函数方式产生的,但数字“11”是在“2”之前发送到CoProcessFunction的。这是为什么?

Q2。

Connected Stream 中没有发生背压。源函数一直运行到最后,即使它们仍在由操作员处理(由上面代码中的 Thread.sleep 模拟)。有没有办法通过连接的流实现背压?

代码编辑 V2

        Configuration config = new Configuration();
    config.setInteger("taskmanager.network.numberOfBuffers", 4);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);

    env.setParallelism(1);

    env 
    .addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=0; i < 50000; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceA: " + i);
                ctx.collect(i);
            }               
        }

        @Override
        public void cancel() {}

    })
    .connect(env.addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=1; i < 50000; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceB: " + i);
                ctx.collect(i);
            }               
        }

        @Override
        public void cancel() {}

    })).process(new CoProcessFunction<Integer, Integer, Integer>(){

        private static final long serialVersionUID = 1L;

        @Override
        public void processElement1(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorA: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);
        }

        @Override
        public void processElement2(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorB: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);

        }

    })      
    .addSink(new SinkFunction<Integer>() {
        private static final long serialVersionUID = 1L;

        @Override
        public void invoke(Integer value) throws Exception {
            System.out.println(System.currentTimeMillis()+":Sink: " + value);
        }

    });

    env.execute();

输出

1578605461497:SourceB: 7279
1578605461497:SourceB: 7281
1578605466406:Sink: 1
1578605466406:OperatorB: 3 <---- only odd numbers (input B) in the output
1578605471411:Sink: 3
1578605471411:OperatorB: 5
1578605476414:Sink: 5
1578605476415:OperatorB: 7
1578605481415:Sink: 7
1578605481415:OperatorB: 9
1578605486417:Sink: 9
1578605486417:OperatorB: 11
1578605491422:Sink: 11
1578605491422:OperatorB: 13
1578605496427:Sink: 13
1578605496427:OperatorB: 15
1578605501432:Sink: 15
1578605501432:OperatorB: 17
1578605506434:Sink: 17
1578605506434:OperatorB: 19
1578605511435:Sink: 19
1578605511435:OperatorB: 21
1578605516435:Sink: 21
1578605516436:OperatorB: 23
1578605521436:Sink: 23
1578605521436:OperatorB: 25
1578605526440:Sink: 25
1578605526440:OperatorB: 27
1578605531443:Sink: 27
1578605531443:OperatorB: 29
1578605536447:Sink: 29
1578605536447:OperatorB: 31
1578605541452:Sink: 31
1578605541452:OperatorB: 33
1578605546457:Sink: 33
1578605546457:OperatorB: 35
1578605551457:Sink: 35
1578605551457:OperatorB: 37
1578605556460:Sink: 37
1578605556460:OperatorB: 39
1578605561518:Sink: 39
1578605561519:OperatorB: 41
1578605566536:Sink: 41
1578605566536:OperatorB: 43
1578605571547:Sink: 43
1578605571547:OperatorB: 45
1578605576554:Sink: 45
1578605576554:OperatorB: 47
1578605581561:Sink: 47
1578605581562:OperatorB: 49
1578605586568:Sink: 49
1578605586568:OperatorB: 51
1578605591576:Sink: 51
1578605591576:OperatorB: 53
1578605596580:Sink: 53
1578605596580:OperatorB: 55
1578605601586:Sink: 55
1578605601587:OperatorB: 57
1578605606592:Sink: 57
1578605606592:OperatorB: 59
1578605611596:Sink: 59
1578605611596:OperatorB: 61
1578605616602:Sink: 61
1578605616602:OperatorB: 63
1578605621606:Sink: 63
1578605621606:OperatorB: 65
1578605626608:Sink: 65
1578605626608:OperatorB: 67
1578605631613:Sink: 67
1578605631613:OperatorB: 69
1578605636618:Sink: 69
1578605636618:OperatorB: 71

标签: apache-flink

解决方案


第一季度

重要的是要了解有序性保证仅适用于通道。这种自由允许具有两个输入的操作员主动选择要消耗的输入。想想哈希连接,第一个完全消耗一侧以构建哈希表,然后流式传输第二侧以探测表。

特别是,对您而言,这意味着您在两个连接的通道之间没有任何顺序保证,因为它们在逻辑上和物理上仍然是分开的。

您是否有任何需要两个输入的有序性的用例?

Q2。

您无法观察到背压,因为您的数据太少。在任何联网通道上,发送方和接收方都有缓冲区。因此,在您使两者都饱和之前,您不会看到施加任何背压。

编辑:关于您的第一条评论

Q1 CoGroupProcessor 将在尽力而为的基础上交替输入,以避免输入饥饿。但是,当其中一个输入空闲时,它将仅从另一个输入读取。输入再次繁忙后,可能需要一些时间(< 1 ms)才能再次拾取流。

Q2 我调整了您的代码并将网络缓冲区的数量降低到 10,并从您的输入中删除了休眠,并得到了显示背压的以下输出。

1578560715990:SourceA: 0
1578560715990:SourceB: 1
...
1578560716041:OperatorA: 0 <-- blocks coprocessfunction
...
1578560716280:SourceB: 29127 <-- at this point network buffers are full
1578560721030:Sink: 0 <-- slow processing in coprocess function, no more inputs are generated because of backpressure
1578560721030:OperatorB: 1
1578560726034:Sink: 1
1578560726034:OperatorA: 2 <-- clear alternation between inputs
1578560731038:Sink: 2
1578560731039:OperatorB: 3
1578560736043:Sink: 3
1578560736043:OperatorA: 4
1578560741047:Sink: 4
1578560741047:OperatorB: 5
1578560746051:Sink: 5
...

推荐阅读