首页 > 解决方案 > Flink:评估流的每个传入元素的窗口

问题描述

我有以下形式的预订元素流:

Booking(id=B1, driverId=D1, time=t1, location=l1)
Booking(id=B2, driverId=D2, time=t2, location=l2)

我需要查找每个位置在过去 15 分钟内进行的预订计数。但是,应针对某个位置的任何新预订来评估该窗口。

大致如下:

Assuming `time` field is set as timestamp of record
bookingStream.keyBy(b=>b.location).window(Any window of 15mins).trigger(triggerFunction)

除了trigger function should not be evaluated15 分钟结束时的 , 而是whenever any booking arrives at a location, 和emit the count of booking in last 15min from timestamp of newly arrived booking.

方法:

使用 RichMap 功能,将位置预订的优先级队列维护为托管状态(ValueState),时间戳作为预订的优先级。对于每个到达的元素,首先将其添加到状态,然后从当前到达的元素中删除早于 15 分钟的元素。将优先级队列中剩余元素的计数发送给收集器。

这是正确的方法还是可以通过以更好的方式使用其他一些 flink 结构来实现。

标签: apache-flinkflink-streaming

解决方案


如果您在基于堆的状态后端上运行,那么您的建议应该表现得相当好。但是对于 RocksDB,您将不得不对每次访问的优先级队列进行序列化/反序列化,这可能会相当痛苦。

一种可能在 RocksDB 上表现更好的方法是将当前计数与 ValueState 中的最早时间戳以及 ListState 中的预订集保持一致。RocksDB 状态后端可以在不经过 ser/de 的情况下附加到 ListState,因此您只需在最早的元素太旧时反序列化和重新序列化整个列表。


推荐阅读