apache-flink - Apache Flink - 如何结合 AssignerWithPeriodicWatermark 和 AssignerWithPunctuatedWatermark?
问题描述
用例:使用 EventTime 并从 Kafka 的记录中提取时间戳。
myConsumer.assignTimestampsAndWatermarks(new MyTimestampEmitter());
...
stream
.keyBy("platform")
.window(TumblingEventTimeWindows 5 mins))
.aggregate(AggFunc(), WindowFunc())
.countWindowAll(size)
.apply(someFunc)
.addSink(someSink);
我想要什么:Flink 为每个记录提取时间戳并在初始间隔(例如 20 秒)内发出水印,然后它可以定期发出水印(例如每 10 秒)。
原因:如果我使用PeriodicWatermark,一开始 Flink 只会在一段时间后才会发出水印,并且我的第一个 5 分钟窗口中的计数是错误的 - 远大于后续窗口中的计数。我有一个解决方法将 setAutoWatermarkInterval 设置为 100 毫秒,但这不是必需的。
目前,我必须使用AssignerWithPeriodicWatermark或AssignerWithPunctuatedWatermark。我如何实施这种组合策略的方法?谢谢。
解决方案
在使用您的水印生成器做一些不寻常的事情之前,我会仔细检查您是否正确诊断出这种情况。总的来说,事件时间窗口的行为应该是确定性的,并且如果呈现相同的输入,总是会产生相同的结果。如果您获得的第一个窗口的结果取决于生成水印的频率,这表明当水印更频繁地到达时您可能有延迟事件被丢弃,并且能够在水印较少时被包括在内经常。也许您的水印没有正确说明您的事件所经历的实际无序程度?或者您的水印可能基于 System.currentTimeMillis(),而不是事件时间戳?
此外,第一个时间窗口与其他时间窗口不同是正常的,因为时间窗口与纪元对齐,而不是与第一个事件对齐。当然,这样做的效果是第一个窗口比所有其他窗口覆盖的时间段更短,因此您应该期望它包含更少的事件,而不是更多。
将 setAutoWatermarkInterval 设置为 100 毫秒是一件非常正常的事情。但是,如果您真的想避免这种情况,您可以考虑使用 AssignerWithPunctuatedWatermarks,它最初为每个事件返回一个水印,然后在适当的时间间隔后,不太频繁地返回水印。
在标点水印分配器中,extractTimestamp 和 checkAndGetNextWatermark 方法都会为每个事件调用。您可以在分配器中使用一些瞬态(非 flink)状态来跟踪第一个事件的时间,或对事件进行计数,并在 checkAndGetNextWatermark 中使用该信息最终退出并停止为每个事件生成水印(通过有时从 checkAndGetNextWatermark 返回 null,而不是 Watermark)。每当重新启动时,您的应用程序将始终恢复为每个事件生成水印。
这不会产生具有周期性和标点分配器的所有特征的分配器,它只是一个自适应标点分配器。
推荐阅读
- php - 处理一些从 PHP 后端到 AJAX 的数据
- java - 带有 MediaView 的 ToogleButton。JavaFX/Java
- python - python中的字符串格式删除之前的空格:以最低效的方式
- python - OverflowError: factorial() 参数不应超过 9223372036854775807
- haskell - 不同数据类型的 Haskell 模式匹配
- javascript - 获取 datetimepicker 值以将其放入输入值
- php - PHP:将数组值添加到
- asp.net - Asp.net 部署问题
- mysql - mysql不想启动
- xml - 使用 express JS 在 localhost 服务器上显示 XML 文件