首页 > 解决方案 > Flink 中的早期触发 - 如何使用触发器将早期窗口结果发送到不同的 DataStream

问题描述

我正在使用使用一天的滚动窗口的代码,并且希望每小时将早期结果发送到不同的 DataStream。我知道触发器是一种方法,但并不真正了解它是如何工作的。

当前代码如下:

myStream
     .keyBy(..)
     .window(TumblingEventTimeWindows.of(Time.days(1)))
     .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())

在我的理解中,我应该注册一个触发器,然后在它的 onEventTime 方法上获取一个 TriggerContext,我可以从那里将数据发送到标记的输出。但是如何从那里获取 MyAggregateFunction 的当前状态?还是我需要在 onEventTime() 中自己计算?

此外,文档指出"By specifying a trigger using trigger() you are overwriting the default trigger of a WindowAssigner.". 然后我的一天窗口仍然会正确触发,还是我需要以不同的方式触发它?

另一种方法是创建两个不同的运算符 - 一个以 1 小时为窗口,另一个以 1 天为窗口。触发器会是解决此问题的首选方法吗?

标签: triggerswindowapache-flinkflink-streamingearly-return

解决方案


与使用自定义相比Trigger,使用两层窗口会更简单,每小时结果进一步汇总为每日结果。像这样的东西:

hourlyResults = myStream
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())

dailyResults = hourlyResults
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.days(1)))
    .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())

hourlyResults.addSink(...)
dailyResults.addSink(...)

请注意,窗口的结果不是 a KeyedStream,因此您需要再次使用 keyBy ,除非您可以安排利用reinterpretAsKeyedStream( docs )。


推荐阅读