首页 > 解决方案 > Apache Flink - 如何结合 AssignerWithPeriodicWatermark 和 AssignerWithPunctuatedWatermark?

问题描述

用例:使用 EventTime 并从 Kafka 的记录中提取时间戳。

myConsumer.assignTimestampsAndWatermarks(new MyTimestampEmitter());
...
stream
        .keyBy("platform")
        .window(TumblingEventTimeWindows 5 mins))
        .aggregate(AggFunc(), WindowFunc())
        .countWindowAll(size)
        .apply(someFunc)
        .addSink(someSink);

我想要什么:Flink 为每个记录提取时间戳并在初始间隔(例如 20 秒)内发出水印,然后它可以定期发出水印(例如每 10 秒)。

原因:如果我使用PeriodicWatermark,一开始 Flink 只会在一段时间后才会发出水印,并且我的第一个 5 分钟窗口中的计数是错误的 - 远大于后续窗口中的计数。我有一个解决方法将 setAutoWatermarkInterval 设置为 100 毫秒,但这不是必需的。

目前,我必须使用AssignerWithPeriodicWatermarkAssignerWithPunctuatedWatermark。我如何实施这种组合策略的方法?谢谢。

标签: apache-flinkflink-streaming

解决方案


在使用您的水印生成器做一些不寻常的事情之前,我会仔细检查您是否正确诊断出这种情况。总的来说,事件时间窗口的行为应该是确定性的,并且如果呈现相同的输入,总是会产生相同的结果。如果您获得的第一个窗口的结果取决于生成水印的频率,这表明当水印更频繁地到达时您可能有延迟事件被丢弃,并且能够在水印较少时被包括在内经常。也许您的水印没有正确说明您的事件所经历的实际无序程度?或者您的水印可能基于 System.currentTimeMillis(),而不是事件时间戳?

此外,第一个时间窗口与其他时间窗口不同是正常的,因为时间窗口与纪元对齐,而不是与第一个事件对齐。当然,这样做的效果是第一个窗口比所有其他窗口覆盖的时间段更短,因此您应该期望它包含更少的事件,而不是更多。

将 setAutoWatermarkInterval 设置为 100 毫秒是一件非常正常的事情。但是,如果您真的想避免这种情况,您可以考虑使用 AssignerWithPunctuatedWatermarks,它最初为每个事件返回一个水印,然后在适当的时间间隔后,不太频繁地返回水印。

在标点水印分配器中,extractTimestamp 和 checkAndGetNextWatermark 方法都会为每个事件调用。您可以在分配器中使用一些瞬态(非 flink)状态来跟踪第一个事件的时间,或对事件进行计数,并在 checkAndGetNextWatermark 中使用该信息最终退出并停止为每个事件生成水印(通过有时从 checkAndGetNextWatermark 返回 null,而不是 Watermark)。每当重新启动时,您的应用程序将始终恢复为每个事件生成水印。

这不会产生具有周期性和标点分配器的所有特征的分配器,它只是一个自适应标点分配器。


推荐阅读