apache-flink - Apache flink - 早期触发窗口实现问题 - 收到重复的元素
问题描述
我很难理解 flink 窗口原则,如果你能指出我正确的方向,我会非常高兴。
我的目的是计算一个时间间隔内重复事件的数量,如果重复事件的数量大于阈值,则生成警报事件。
据我了解,窗口化非常适合这种情况。
附加要求是,如果窗口中的重复事件计数为 2,则生成早期警报(即应在不等待窗口结束的情况下生成警报)。
我认为警报事件生成过程窗口函数可用于聚合窗口事件,自定义触发器可用于根据重复事件计数(在水印到达窗口的结束时间戳之前)从窗口发出早期结果。
我正在使用事件时间语义并且对自定义触发器有问题/疑问。
您可以在 gist 中找到实际的实现:https ://gist.github.com/simpleusr/7c56d4384f6fc9f0a61860a680bb5f36
我正在使用键控状态来跟踪窗口中的元素计数encounteredElementsCountState
在收到第一个元素后,我注册EventTimeTimer
到窗口端。这应该触发FIRE_AND_PURGE
窗口关闭并按预期工作。
如果计数超过阈值,我会尝试触发早期触发。这似乎也成功了,processwindow
在此触发后立即调用函数。
问题是,我不得不在不了解原因的情况下将下面的检查插入代码。因为之前收集的元素再次提供给onElement
方法:
if (ctx.getCurrentWatermark() < 0) {
logger.debug(String.format("onElement processing skipped for eventId : %s for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark()));
return TriggerResult.CONTINUE;
}
我无法弄清楚原因。我看到的是,当这种情况发生时,水印值是(ctx.getCurrentWatermark()) Long.MIN_VALUE
(导致上述检查)。这怎么可能发生?
此检查似乎避免了重复的早期事件生成,但我不知道为什么会发生这种情况以及这种解决方法是否合适。
您能否建议为什么在窗口中处理两次相同的元素?
另一个问题是关于键控状态的使用。这个实现在处理窗口后是否会泄漏任何状态?我正在尝试以触发器的清除方法清除所有使用的状态,但这是否足够?
问候。
解决方案
每个任务都将 currentWatermark 初始化为 Long.MIN_VALUE,并且这仍然是 currentWatermark 的本地值,直到从该任务的所有输入流中到达更大的水印。希望知道这将帮助您更好地了解正在发生的事情。
就其价值而言,使用 ProcessFunction 实现这种逻辑通常比使用 Window API 更直接。
推荐阅读
- c# - 如何加快数据网格视图?
- python - 熊猫 pd.pivot_table 的更快更有效的替代方案?
- php - Symfony 5 Form 在会话中改变用户
- python - Tkinter - 用笔记本改变框架
- sqlite - 如何选择具有最大计数的所有行而不仅仅是第一行?
- java - 如何为 Java JCSG 对象设置颜色?[JavaFX]
- swift - 如何在 swift 5 中将自定义字体设置为 UIdate 选择器?
- java - 嵌套异常是 java.lang.NumberFormatException: For input string: "favicon.ico"]
- amazon-web-services - AWS - 无法监控免费套餐使用情况
- c - 有什么方法可以防止 CPU 在处理程序中被从进程中取出?