apache-flink - Flink - 如何在状态中聚合
问题描述
我有一个键控数据流,如下所示:
{
summary:Integer
uid:String
key:String
.....
}
我需要在某个时间范围内聚合汇总值,一旦达到特定数字,将汇总和影响汇总的所有 UID 刷新到数据库/日志文件。
第一次刷新后,我想从内存中删除所有 uid,然后立即刷新每个新项目。
所以我尝试了这个聚合函数。
public class AggFunc implements AggregateFunction<Item, Acc, Tuple2<Integer,List<String>>>{
private static final long serialVersionUID = 1L;
@Override
public Acc createAccumulator() {
return new Acc());
}
@Override
public Acc add(Item value, Acc accumulator) {
accumulator.inc(value.getSummary());
accumulator.addUid(value.getUid);
return accumulator;
}
@Override
public Tuple2<Integer,List<String>> getResult(Acc accumulator) {
List<String> newL = Lists.newArrayList(accumulator.getUids());
accumulator.setUids(Lists.newArrayList());
return Tuple2.of(accumulator.getSum(), newL);
}
@Override
public Acc merge(Acc a, Acc b) {
.....
}
}
在聚合过程函数中,我将列表刷新到状态,如果我需要保存到数据库,我将清除状态并在状态中保存标志以指示它。
但这对我来说似乎是弯曲的。我不确定这是否对我有用。
这种情况有更好的解决方案吗?
解决方案
在丰富的函数中使用状态。继续uid
在您的状态和窗口触发刷新值时添加。官方文档中的这个页面有一个例子。
对于您的情况, aListState
会很好地工作。
编辑:
上述解决方案适用于非窗口情况。对于窗口情况,只需使用具有丰富窗口功能的应用功能的聚合
推荐阅读
- microsoft-graph-api - microsoft graph api - 无法创建邮件订阅错误:ServiceUnavailable
- javascript - 为什么我的轮子在点击时会令人讨厌地旋转?
- javascript - 滚动上的类方法
- javascript - 确定页面何时刷新或重新加载
- reactjs - 我正在尝试根据一组值动态设置状态
- javascript - 如何使用来自 JSON 的信息填充数组?
- elasticsearch - 过滤具有内部命中的对象列表
- python-3.x - 如何获得 Scrapy 请求以转到网站的最后一页?
- ios - 苹果推送通知证书更新后,onesignal 令牌停止工作
- sql-server - 计算带有块的结果的列?