apache-flink - Apache Flink:即使没有输入记录到达给定聚合窗口,也基于键控状态在 Flink 中发出输出记录
问题描述
我正在尝试将 Apache Flink 用于 IoT 应用程序。我有一堆设备可以处于几种状态之一。当设备更改状态时,它会发出一条消息,其中包含事件时间戳及其更改的状态。对于一台设备,这可能如下所示:
{Device_id:1,Event_Timestamp:9:01,状态:STATE_1}
{Device_id:1,Event_Timestamp:9:03,状态:STATE_2}
对于每个设备,我需要为给定的五分钟窗口内设备在每个状态所花费的时间生成一个五分钟的聚合。为了做到这一点,我计划使用键控状态来存储每个设备的最后状态更新,以便我知道设备在聚合窗口开始时处于什么状态。例如,假设 id 为“1”的设备有一个键控状态值,表示它在 8:58 进入“STATE_2”。然后 9:00 - 9:05 窗口的聚合输出将如下所示(基于上面的两个示例事件):
{Device_id:1,时间戳:9:00,状态:STATE_1,持续时间:120 秒}
{Device_id:1,时间戳:9:00,状态:STATE_2,持续时间:180 秒}
我的问题是:如果窗口有事件,Flink 只会为给定的 device_id 打开一个窗口。这意味着如果设备超过 5 分钟没有更改状态,则不会有任何记录进入流,因此不会打开窗口。但是,我需要发出一条记录,说明设备在当前状态下花费了整整五分钟,具体取决于存储在键控状态中的内容。例如,Flink 应该发出 9:05-9:10 的记录,表明 id 为“1”的设备在“STATE_2”中花费了全部 300 秒。
有没有办法输出每个设备在给定状态下花费五分钟聚合窗口的时间量的记录,即使状态在这五分钟内没有改变,因此设备不发送任何事件?如果没有,是否有任何解决方法可以用来获取我的应用程序所需的输出事件?
解决方案
实现这一点的一种直接方法是使用 ProcessFunction 而不是窗口。您可以保留任何对您的应用程序方便的键控状态,并使用计时器触发生成定期报告。
推荐阅读
- python - 交叉验证网格搜索背后的理论
- reporting-services - 在具有多个列的列组中编写 SUM 语句
- python - Selenium Python - 选择数据范围
- google-apps-script - Google Apps 脚本 - 如何从段落中获取文本?
- android - Xamarin.Forms.3.0 错误 Xamarin.Forms.Build.Tasks.GetTasksAbi
- kubernetes - Kubernetes 命名空间默认服务帐户
- mahout - What is the right way to increment mahout recommender model?
- c++ - 如何检测 /dev/ttyGS0 是否已连接
- string - 在systemverilog中将变量添加到字符串
- angular - Ngxs - 使用 api 的正确方式?