首页 > 解决方案 > 如何在 Flink Stream Processing Windowing 中收集迟到的数据

问题描述

考虑我有一个包含事件时间数据的数据流。我想在 8 毫秒的窗口时间内收集输入数据流并减少每个窗口数据。我使用以下代码执行此操作:

aggregatedTuple
          .keyBy( 0).timeWindow(Time.milliseconds(8))
          .reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()

Point数据流的key是处理时间的时间戳,映射到处理毫秒的时间戳的最后8个约数,例如1531569851297将映射到1531569851296

但是有可能数据流到达较晚并进入了错误的窗口时间。例如,假设我将窗口时间设置为 8 毫秒。如果数据按顺序进入 Flink 引擎,或者至少延迟小于窗口时间(8 毫秒),那将是最好的情况。但是假设数据流事件时间(也是数据流中的一个字段)以 30 毫秒的延迟到达。所以它会进入错误的窗口,我想如果我检查每个数据流的事件时间,因为它想进入窗口,我可以过滤这么晚的数据。所以我有两个问题:

标签: apache-flinkstream-processingwindowing

解决方案


Flink 有两个不同的相关抽象,它们处理对具有事件时间时间戳的流计算窗口分析的不同方面:水印允许延迟

首先是watermarks,它在处理事件时间数据时发挥作用(无论您是否使用 Windows)。Watermarks 向 Flink 提供有关 event-time 进度的信息,并为您(应用程序编写者)提供一种处理乱序数据的方法。水印与数据流一起流动,每个水印在流中标记一个位置并带有时间戳。水印用作断言,在流中的该点,流现在(可能)在该时间戳之前完成 - 或者换句话说,跟随水印的事件不太可能来自由水印。最常见的水印策略是使用BoundedOutOfOrdernessTimestampExtractor,它假定事件在某个固定的有界延迟内到达。

这现在提供了迟到的定义——时间戳小于水印时间戳的水印之后的事件被认为是迟到的。

窗口 API 提供了允许延迟的概念,默认设置为零。如果允许的延迟大于零,则事件时间窗口的默认触发器将接受延迟事件到其相应的窗口,直到允许延迟的限制。窗口操作将在通常时间触发一次,然后对于每个延迟事件再次触发,直到允许的延迟间隔结束。之后,延迟事件将被丢弃(如果配置了,则将其收集到侧面输出)。

How can I filter data stream as it wants to enter the window and check 
if the data created at the right timestamp for the window?

Flink 的窗口分配器负责将事件分配给适当的窗口——正确的事情会自动发生。将根据需要创建新的窗口实例。

How can I gather such late data in a variable to do some processing on them?

您可以在水印中足够大方以避免出现任何迟到的数据,和/或将允许的迟到配置为足够长以适应迟到的事件。但是请注意,Flink 将被迫保持所有仍在接受迟到事件的窗口打开,这将延迟垃圾收集旧窗口并可能消耗大量内存。

请注意,此讨论假设您要使用时间窗口——例如,您正在使用的 8 毫秒长窗口。Flink 还支持计数窗口(例如将事件分组为 100 个批次)、会话窗口和自定义窗口逻辑。例如,如果您使用计数窗口,水印和迟到不会发挥任何作用。

如果您想要分析的每个键结果,则在应用窗口之前使用 keyBy 按键(例如,按 userId)对流进行分区。例如

stream
  .keyBy(e -> e.userId)
  .timeWindow(Time.seconds(10))
  .reduce(...)

将为每个 userId 生成单独的结果。

更新:请注意,在 Flink 的最新版本中,Windows 现在可以将延迟事件收集到侧面输出。

一些相关文档:

事件时间和水印
允许延迟


推荐阅读