首页 > 解决方案 > Flink BoundedOutOfOrdernessGenerator 水印问题

问题描述

我正在尝试构建自己的 a 实现,如Flink 文档BoundedOutOfOrdernessGenerator中所建议的那样,鉴于水印似乎没有正确更新,我在这样做时遇到了一些问题。Windows 永远不会触发,因为水印没有前进。

这些是一些相关的片段,其中myWatermarkAssigner是一个WatermarkGenerator实例:

主流:

dataLeft = dataLeft.assignTimestampsAndWatermarks(myWatermarkAssigner);
dataRight = dataRight.assignTimestampsAndWatermarks(myWatermarkAssigner);

DataStream<MyWindow> output = dataLeft
    .keyBy("field")
    .coGroup(dataRight.keyBy("field"))
    .where(dataL -> dataL.id)
    .equalTo(dataR -> dataR.id)
    .window(TumblingEventTimeWindows.of(duration))
    .apply(myProcessor);

生成器类:

public class WatermarkGenerator<T extends MyEvent> implements AssignerWithPeriodicWatermarks<T> {
    private static final long serialVersionUID = 1L;
    private long maxOutOfOrderness = 15000; // 15 seconds
    private long currentMaxTimestamp;

    public WatermarkGenerator(long maxTimeLag) {
        this.maxOutOfOrderness = maxTimeLag;
    }

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.eventTime.getTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);

        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

如果我在里面添加一个简单的日志getCurrentWatermark(),结果如下:

0
0
0
0
0
1557157602000 // First element just arrived
0
0
0
1557157602000
0
0
0
1557157602000
0
0
0

而且窗户永远不会被触发,我知道那是因为水印有时是 0,有时是正确的值。

这是否与我使用相同的WatermarkGeneratorfor dataLeftand实例dataRight而我只在 上接收事件有关dataLeft

此外,如果我像这样使用系统时间作为水印,Windows 触发器和管道就像一个魅力:

// return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return new Watermark(System.currentTimeMillis() - maxOutOfOrderness);

作为一些最后的说明,我正在使用.setAutoWatermarkInterval(1000L)并且我已经尝试使用内置BoundedOutOfOrdernessTimestampExtractor实现,但结果相同。

标签: javaapache-flinkflink-streaming

解决方案


推荐阅读