首页 > 解决方案 > Apache flink - 早期触发窗口实现问题 - 收到重复的元素

问题描述

我很难理解 flink 窗口原则,如果你能指出我正确的方向,我会非常高兴。

我的目的是计算一个时间间隔内重复事件的数量,如果重复事件的数量大于阈值,则生成警报事件。

据我了解,窗口化非常适合这种情况。

附加要求是,如果窗口中的重复事件计数为 2,则生成早期警报(即应在不等待窗口结束的情况下生成警报)。

我认为警报事件生成过程窗口函数可用于聚合窗口事件,自定义触发器可用于根据重复事件计数(在水印到达窗口的结束时间戳之前)从窗口发出早期结果。

我正在使用事件时间语义并且对自定义触发器有问题/疑问。

您可以在 gist 中找到实际的实现:https ://gist.github.com/simpleusr/7c56d4384f6fc9f0a61860a680bb5f36

我正在使用键控状态来跟踪窗口中的元素计数encounteredElementsCountState

在收到第一个元素后,我注册EventTimeTimer到窗口端。这应该触发FIRE_AND_PURGE窗口关闭并按预期工作。

如果计数超过阈值,我会尝试触发早期触发。这似乎也成功了,processwindow在此触发后立即调用函数。

问题是,我不得不在不了解原因的情况下将下面的检查插入代码。因为之前收集的元素再次提供给onElement方法:

if (ctx.getCurrentWatermark() < 0) {
            logger.debug(String.format("onElement processing skipped for eventId : %s for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark()));
            return TriggerResult.CONTINUE;
        }

我无法弄清楚原因。我看到的是,当这种情况发生时,水印值是(ctx.getCurrentWatermark()) Long.MIN_VALUE(导致上述检查)。这怎么可能发生?

此检查似乎避免了重复的早期事件生成,但我不知道为什么会发生这种情况以及这种解决方法是否合适。

您能否建议为什么在窗口中处理两次相同的元素?

另一个问题是关于键控状态的使用。这个实现在处理窗口后是否会泄漏任何状态?我正在尝试以触发器的清除方法清除所有使用的状态,但这是否足够?

问候。

标签: apache-flinkflink-streaming

解决方案


每个任务都将 currentWatermark 初始化为 Long.MIN_VALUE,并且这仍然是 currentWatermark 的本地值,直到从该任务的所有输入流中到达更大的水印。希望知道这将帮助您更好地了解正在发生的事情。

就其价值而言,使用 ProcessFunction 实现这种逻辑通常比使用 Window API 更直接。


推荐阅读