首页 > 解决方案 > Apache Flink - CoFlatMapFunction 不按事件时间顺序处理事件

问题描述

我配置了事件时间处理,并有一个带有CoFlatMapFunction. 我正在编写一个测试用例,但我看到它FlatMapFunction没有调用它的方法flatMap1()并且没有flatMap2()按事件时间顺序调用事件。

一些伪代码来澄清

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

DataStream<Integer> evenStream = env.addSource(new SourceFunction<Integer>(){
         public void run(SourceContext<Integer> ctxt){
                for (i=0; i < 20; i=i+2){
                     ctxt.collectWithTimestamp(i, i);
                     ctxt.emitWatermark(i);
                } 
         }
 }
 )
DataStream<Integer> oddStream  = env.addSource(new SourceFunction<Integer>(){
         public void run(SourceContext<Integer> ctxt){
                for (i=1; i < 21; i=i+2){
                     ctxt.collectWithTimestamp(i, i); // Using i as timestamp and watermark for this sample code, but in real code, I am using using timestamp of real event 
                     ctxt.emitWatermark(i);
                } 
         }
 }
 )

evenStream
   .connect(oddStream)
   .flatMap(new CoFlatMapFunction<Integer, Integer, Integer>(){

        public void flatMap1(Integer evenNumber, Collector<Integer> out){                  
               System.out.println(evenNumber);
        }
        public void flatMap2(Integer oddNumber, Collector<Integer> out){
               System.out.println(oddNumber);
        }

   }
   );

当我运行它时,我希望它打印:

0,1,2,3,4....21

这是因为我交替设置偶数和奇数的时间戳。换句话说,0 具有最低的时间戳,其次是 1,其次是 2,以此类推。

但它首先打印所有偶数,然后是奇数。

总之,我希望按照我在事件中设置的时间戳顺序调用 flatMap1() 和 flatMap2()。但这并没有发生。

标签: apache-flinkflink-streaming

解决方案


Flink 的协同函数(连接流上的函数)不保证调用它们的方法的顺序。每当事件可从任一输入获得时,都会调用方法(例如flatMap1()和)。flatMap2()在您的示例中,偶数源生成的数据量太小,以至于当奇数到达时所有数据都已处理。

那么,事件时间处理如何为协同功能工作?

协函数的水印始终是两个输入的最小水印。对于 aCoFlatMapFunction这并不重要,因为您既不能读取当前水印也不能读取记录的时间戳。但是,CoProcessFunction您可以访问两者,并且可以注册在水印到达某个时间点时调用的计时器。如果要在事件时间对传出流进行排序,则需要缓冲传入事件(处于状态),并且当水印进行时,您可以按顺序发出所有记录,直到水印上的时间。


推荐阅读