apache-flink - FLINK - 将 SQL 窗口定期刷新元素以进行处理
问题描述
如果 TUMBLE 窗口会定期计算并发出要处理的元素,我会感到困惑。示例我有一个预计在 10 秒间隔内工作的查询。
select id, key from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ;
现在让我们说:应用程序接收事件
- E1 @10:00:00
- E2 @10:00:05
- E3 @12:00:10
如您所见,E1 和 E2 在 5 秒内到达,E3 在 @12:00:15 到达。
- 当 E1 和 E2 被发出进行处理时,你能帮我吗?会是@10:00:11 吗?或者 E3 什么时候来,然后查询将评估窗口并发出?
- 如果是在 E3 之后,那么有什么方法可以确保每 10 秒执行一次查询?
感谢您对此的帮助。
解决方案
如果您正在使用事件时间处理,则在水印通过 10:00:10 时将发出在 10:00:10 结束的窗口。如果水印以通常的有界无序方式完成,并且如果没有其他事件,则水印将不会前进,直到 E3 被处理。
如果您需要考虑空闲的水印策略,我相信您唯一的选择是使用 DataStream API 创建流并应用处理空闲源的水印,然后将 DataStream 转换为 Table。
请注意,.withIdleness(...)
所做的是将流标记为空闲,这可以防止该流保留水印。如果有其他活动流,这解决了一个空闲流阻碍整个作业的问题。如果您希望在完全没有发生任何事情的情况下继续水印,您需要做一些更激烈的事情。
理想的解决方案是让来自同一来源的 keepalive 消息,以便您知道空闲是真实的,而不是中断。如果做不到这一点,请参阅ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor示例,了解如何使用计时器检测空闲状态并根据时间的流逝而不是新事件的到来来推进水印。(请注意,此示例尚未更新为使用新WatermarkStrategy
界面。)
推荐阅读
- r - jupyter notebook R cell执行速度很慢
- java - 在 WAS for Java 8 中生成战争文件和运行战争文件时 .classpath 的重要性
- sql-server - 如何在 SQL Server 中使用 CASE 组合两个子查询
- java - 在 JUnit 5 参数化测试中。CSV 输入。有没有办法通过 Double.NaN,Double.POSITIVE_INFINITY?
- django - 在 Django 模板中设置组
- c# - .NetCore 3.1API 无法加载文件或程序集“Microsoft.AspNetCore.Hosting.Abstractions”
- java - JpaRepository 中的existById 返回 NullPointerException
- python - TypeError: preprocess() 得到了一个意外的关键字参数“shave”
- javascript - 为表中的每个 td 添加唯一的事件侦听器
- css - SVG组中的响应间距?