java - Flink BoundedOutOfOrdernessGenerator 水印问题
问题描述
我正在尝试构建自己的 a 实现,如Flink 文档BoundedOutOfOrdernessGenerator
中所建议的那样,鉴于水印似乎没有正确更新,我在这样做时遇到了一些问题。Windows 永远不会触发,因为水印没有前进。
这些是一些相关的片段,其中myWatermarkAssigner
是一个WatermarkGenerator
实例:
主流:
dataLeft = dataLeft.assignTimestampsAndWatermarks(myWatermarkAssigner);
dataRight = dataRight.assignTimestampsAndWatermarks(myWatermarkAssigner);
DataStream<MyWindow> output = dataLeft
.keyBy("field")
.coGroup(dataRight.keyBy("field"))
.where(dataL -> dataL.id)
.equalTo(dataR -> dataR.id)
.window(TumblingEventTimeWindows.of(duration))
.apply(myProcessor);
生成器类:
public class WatermarkGenerator<T extends MyEvent> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = 1L;
private long maxOutOfOrderness = 15000; // 15 seconds
private long currentMaxTimestamp;
public WatermarkGenerator(long maxTimeLag) {
this.maxOutOfOrderness = maxTimeLag;
}
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.eventTime.getTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
如果我在里面添加一个简单的日志getCurrentWatermark()
,结果如下:
0
0
0
0
0
1557157602000 // First element just arrived
0
0
0
1557157602000
0
0
0
1557157602000
0
0
0
而且窗户永远不会被触发,我知道那是因为水印有时是 0,有时是正确的值。
这是否与我使用相同的WatermarkGenerator
for dataLeft
and实例dataRight
而我只在 上接收事件有关dataLeft
?
此外,如果我像这样使用系统时间作为水印,Windows 触发器和管道就像一个魅力:
// return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return new Watermark(System.currentTimeMillis() - maxOutOfOrderness);
作为一些最后的说明,我正在使用.setAutoWatermarkInterval(1000L)
并且我已经尝试使用内置BoundedOutOfOrdernessTimestampExtractor
实现,但结果相同。
解决方案
推荐阅读
- angular - Angular 6 - 模拟 authState - 未定义的 valueChanges
- php - 如何使用 Codeigniter/PHP 在运行时动态调整图像大小?
- c++ - pthread_create 中出现错误 4 的 Segfault
- javascript - 嵌入式 youtube 播放列表可实时播放,如电视频道?
- bash - Gitlab-CI:为 Linux 和 Windows 设置环境变量
- laravel - 自定义验证不返回默认错误消息
- c# - 如何从 WPF 中的当前页面调用另一个页面?
- ruby - 如何在 Ruby on Rails 中连接两个不同数据库的表
- c# - 沙盒贝宝没有将用户重定向到成功页面
- terraform - 控制 terraform 类似资源而不在修改列表时重新创建它们的最佳方法