首页 > 解决方案 > 带有 onEventTime 触发器的 Flink 会话窗口?

问题描述

我想在 Flink 中创建一个基于 EventTime 的会话窗口,以便在新消息的事件时间比创建窗口的消息的事件时间大 180 秒以上时触发。

例如:

t1(0 seconds)  : msg1  <-- This is the first message which causes the session-windows to be created
t2(13 seconds) : msg2
t3(39 seconds) : msg3
.
.
.
.
t7(190 seconds) : msg7 <-- The event time (t7) is more than 180 seconds than t1 (t7 - t1 = 190), so the window should be triggered and processed now.
t8(193 seconds) : msg8 <-- This message, and all subsequent messages have to be ignored as this window was processed at t7

我想创建一个触发器,以便通过适当的水印或 onEventTime 触发器实现上述行为。谁能提供一些例子来实现这一点?

标签: scalaapache-flinkflink-streaming

解决方案


解决此问题的最佳方法可能是使用 ProcessFunction,而不是使用自定义窗口。如果如您的示例所示,事件将按时间戳顺序处理,那么这将非常简单。另一方面,如果您必须处理乱序事件(这在处理事件时间数据时很常见),那么它会稍微复杂一些。(想象一下,时间为 187 的 msg6 在 t8 之后到达。如果可能,并且这会影响您想要产生的结果,那么必须处理这个问题。)

如果事件是有序的,那么逻辑大致如下所示:

使用 AscendingTimestampExtractor 作为水印的基础。

使用 Flink 状态(可能是 ListState)来存储窗口内容。当一个事件到达时,将它添加到窗口并检查它是否已经超过了第一个事件的 180 秒。如果是,则处理窗口内容并清除列表。

如果您的事件可能是无序的,则使用 BoundedOutOfOrdernessTimestampExtractor,并且不要处理窗口的内容,直到 currentWatermark 指示事件时间已超过窗口的开始时间 180 秒(您可以为此使用事件时间计时器) . 触发窗口时不要完全清除列表,而只是删除属于正在关闭的窗口的元素。


推荐阅读