triggers - Flink 中的早期触发 - 如何使用触发器将早期窗口结果发送到不同的 DataStream
问题描述
我正在使用使用一天的滚动窗口的代码,并且希望每小时将早期结果发送到不同的 DataStream。我知道触发器是一种方法,但并不真正了解它是如何工作的。
当前代码如下:
myStream
.keyBy(..)
.window(TumblingEventTimeWindows.of(Time.days(1)))
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
在我的理解中,我应该注册一个触发器,然后在它的 onEventTime 方法上获取一个 TriggerContext,我可以从那里将数据发送到标记的输出。但是如何从那里获取 MyAggregateFunction 的当前状态?还是我需要在 onEventTime() 中自己计算?
此外,文档指出"By specifying a trigger using trigger() you are overwriting the default trigger of a WindowAssigner."
. 然后我的一天窗口仍然会正确触发,还是我需要以不同的方式触发它?
另一种方法是创建两个不同的运算符 - 一个以 1 小时为窗口,另一个以 1 天为窗口。触发器会是解决此问题的首选方法吗?
解决方案
与使用自定义相比Trigger
,使用两层窗口会更简单,每小时结果进一步汇总为每日结果。像这样的东西:
hourlyResults = myStream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
dailyResults = hourlyResults
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.days(1)))
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
hourlyResults.addSink(...)
dailyResults.addSink(...)
请注意,窗口的结果不是 a KeyedStream
,因此您需要再次使用 keyBy ,除非您可以安排利用reinterpretAsKeyedStream
( docs )。
推荐阅读
- html - 垂直幻灯片动画
- html - css 居中代码不适用于 html 和 css 中的相同文件名
- django - 有没有办法为单独的仪表板自定义 django-treebeard 管理界面?
- kotlin - 用类数组表示 json 的 Kotlin 类应该是什么
- numpy-ndarray - numpy.array() 文档不清楚 '*'
- javascript - 使用加密 API 解密数据时如何解决此 OperationError 错误?
- python - python bsoup 删除部分文本
- php - 由于组件创建在不同的文件夹中,Laravel 组件类被忽略
- sql - 在 athena/presto 中将数组(varchar)转换为 varchar
- firebase - Flutter 和 Firebase 将对象添加到列表