apache-flink - 如何在 Flink Stream Processing Windowing 中收集迟到的数据
问题描述
考虑我有一个包含事件时间数据的数据流。我想在 8 毫秒的窗口时间内收集输入数据流并减少每个窗口数据。我使用以下代码执行此操作:
aggregatedTuple
.keyBy( 0).timeWindow(Time.milliseconds(8))
.reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()
Point:数据流的key是处理时间的时间戳,映射到处理毫秒的时间戳的最后8个约数,例如1531569851297
将映射到1531569851296
。
但是有可能数据流到达较晚并进入了错误的窗口时间。例如,假设我将窗口时间设置为 8 毫秒。如果数据按顺序进入 Flink 引擎,或者至少延迟小于窗口时间(8 毫秒),那将是最好的情况。但是假设数据流事件时间(也是数据流中的一个字段)以 30 毫秒的延迟到达。所以它会进入错误的窗口,我想如果我检查每个数据流的事件时间,因为它想进入窗口,我可以过滤这么晚的数据。所以我有两个问题:
- 我如何过滤要进入窗口的数据流并检查是否在窗口的正确时间戳处创建的数据?
- 如何在变量中收集如此晚的数据以对它们进行一些处理?
解决方案
Flink 有两个不同的相关抽象,它们处理对具有事件时间时间戳的流计算窗口分析的不同方面:水印和允许延迟。
首先是watermarks,它在处理事件时间数据时发挥作用(无论您是否使用 Windows)。Watermarks 向 Flink 提供有关 event-time 进度的信息,并为您(应用程序编写者)提供一种处理乱序数据的方法。水印与数据流一起流动,每个水印在流中标记一个位置并带有时间戳。水印用作断言,在流中的该点,流现在(可能)在该时间戳之前完成 - 或者换句话说,跟随水印的事件不太可能来自由水印。最常见的水印策略是使用BoundedOutOfOrdernessTimestampExtractor,它假定事件在某个固定的有界延迟内到达。
这现在提供了迟到的定义——时间戳小于水印时间戳的水印之后的事件被认为是迟到的。
窗口 API 提供了允许延迟的概念,默认设置为零。如果允许的延迟大于零,则事件时间窗口的默认触发器将接受延迟事件到其相应的窗口,直到允许延迟的限制。窗口操作将在通常时间触发一次,然后对于每个延迟事件再次触发,直到允许的延迟间隔结束。之后,延迟事件将被丢弃(如果配置了,则将其收集到侧面输出)。
How can I filter data stream as it wants to enter the window and check
if the data created at the right timestamp for the window?
Flink 的窗口分配器负责将事件分配给适当的窗口——正确的事情会自动发生。将根据需要创建新的窗口实例。
How can I gather such late data in a variable to do some processing on them?
您可以在水印中足够大方以避免出现任何迟到的数据,和/或将允许的迟到配置为足够长以适应迟到的事件。但是请注意,Flink 将被迫保持所有仍在接受迟到事件的窗口打开,这将延迟垃圾收集旧窗口并可能消耗大量内存。
请注意,此讨论假设您要使用时间窗口——例如,您正在使用的 8 毫秒长窗口。Flink 还支持计数窗口(例如将事件分组为 100 个批次)、会话窗口和自定义窗口逻辑。例如,如果您使用计数窗口,水印和迟到不会发挥任何作用。
如果您想要分析的每个键结果,则在应用窗口之前使用 keyBy 按键(例如,按 userId)对流进行分区。例如
stream
.keyBy(e -> e.userId)
.timeWindow(Time.seconds(10))
.reduce(...)
将为每个 userId 生成单独的结果。
更新:请注意,在 Flink 的最新版本中,Windows 现在可以将延迟事件收集到侧面输出。
一些相关文档:
推荐阅读
- gradle - Gradle zip task fails because zip does not exist
- java - 如何将 Web 元素转换为 JSON 字符串?
- math - 为给定数量的整数生成唯一值的配对函数
- python - 正则表达式从字符串中查找解析瓶大小(例如 750ML)
- vpn - Cisco ASA IPsec 隧道断开向所有主机发送 RST
- amazon-emr - 在 Airflow EMR 运算符中使用 Json 输入变量
- openssl - 我应该在 rsa_pub_enc 和 rsa_pub_dec OpenSSL 函数中使用什么?
- html - 如何使用选项卡自动完成 WebStorm 在 div 中编写多类?
- javascript - 在阻止 html 文档正文中的脚本标签之前,是否会加载异步脚本标签?
- android - 从 Android ContactsContract.Contacts 查询所有联系人的许多数据时如何提高性能?