apache-flink - Apache Flink - CoFlatMapFunction 不按事件时间顺序处理事件
问题描述
我配置了事件时间处理,并有一个带有CoFlatMapFunction
. 我正在编写一个测试用例,但我看到它FlatMapFunction
没有调用它的方法flatMap1()
并且没有flatMap2()
按事件时间顺序调用事件。
一些伪代码来澄清
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
DataStream<Integer> evenStream = env.addSource(new SourceFunction<Integer>(){
public void run(SourceContext<Integer> ctxt){
for (i=0; i < 20; i=i+2){
ctxt.collectWithTimestamp(i, i);
ctxt.emitWatermark(i);
}
}
}
)
DataStream<Integer> oddStream = env.addSource(new SourceFunction<Integer>(){
public void run(SourceContext<Integer> ctxt){
for (i=1; i < 21; i=i+2){
ctxt.collectWithTimestamp(i, i); // Using i as timestamp and watermark for this sample code, but in real code, I am using using timestamp of real event
ctxt.emitWatermark(i);
}
}
}
)
evenStream
.connect(oddStream)
.flatMap(new CoFlatMapFunction<Integer, Integer, Integer>(){
public void flatMap1(Integer evenNumber, Collector<Integer> out){
System.out.println(evenNumber);
}
public void flatMap2(Integer oddNumber, Collector<Integer> out){
System.out.println(oddNumber);
}
}
);
当我运行它时,我希望它打印:
0,1,2,3,4....21
这是因为我交替设置偶数和奇数的时间戳。换句话说,0 具有最低的时间戳,其次是 1,其次是 2,以此类推。
但它首先打印所有偶数,然后是奇数。
总之,我希望按照我在事件中设置的时间戳顺序调用 flatMap1() 和 flatMap2()。但这并没有发生。
解决方案
Flink 的协同函数(连接流上的函数)不保证调用它们的方法的顺序。每当事件可从任一输入获得时,都会调用方法(例如flatMap1()
和)。flatMap2()
在您的示例中,偶数源生成的数据量太小,以至于当奇数到达时所有数据都已处理。
那么,事件时间处理如何为协同功能工作?
协函数的水印始终是两个输入的最小水印。对于 aCoFlatMapFunction
这并不重要,因为您既不能读取当前水印也不能读取记录的时间戳。但是,CoProcessFunction
您可以访问两者,并且可以注册在水印到达某个时间点时调用的计时器。如果要在事件时间对传出流进行排序,则需要缓冲传入事件(处于状态),并且当水印进行时,您可以按顺序发出所有记录,直到水印上的时间。
推荐阅读
- raku - 使用 Metamodel::ConcreteRoleHOW.new_type 时出错
- google-cloud-dataflow - Beam 管道在 GroupByKey 与窗口后不产生任何输出,我得到内存错误
- neo4j - NEO4J Cypher 查找具有多个关系计数的节点
- matlab - Why is a variable undefined when trying to analyse .bat file in MATLAB?
- android - 如何在 iOS 和 Android 的给定时间检测用户是否仅在一台设备上处于活动状态
- json - Installing gem json 2.2.0 failed - windows
- elasticsearch - How to remove multiple fields from index?
- sql - Postgresql:从 EXECUTE 输出导出到 csv 数据
- xcode - Xcode 10.2, iOS 12.2 : Application installed over OTA(Enterprise) not opening
- django - 使用 Django 在谷歌应用引擎中部署时,PyYAML 有多重要?