首页 > 解决方案 > Flink - 如何在状态中聚合

问题描述

我有一个键控数据流,如下所示:

    {
        summary:Integer
        uid:String
        key:String
        .....
    }

我需要在某个时间范围内聚合汇总值,一旦达到特定数字,将汇总和影响汇总的所有 UID 刷新到数据库/日志文件。

第一次刷新后,我想从内存中删除所有 uid,然后立即刷新每个新项目。

所以我尝试了这个聚合函数。

public class AggFunc implements AggregateFunction<Item, Acc, Tuple2<Integer,List<String>>>{

    private static final long serialVersionUID = 1L;

    @Override
    public Acc createAccumulator() {
        return new Acc());
    }

    @Override
    public Acc add(Item value, Acc accumulator) {
        accumulator.inc(value.getSummary());
        accumulator.addUid(value.getUid);
        return accumulator;
    }

    @Override
    public Tuple2<Integer,List<String>> getResult(Acc accumulator) {
        List<String> newL = Lists.newArrayList(accumulator.getUids());
        accumulator.setUids(Lists.newArrayList());
        return Tuple2.of(accumulator.getSum(), newL);
    }

    @Override
    public Acc merge(Acc a, Acc b) {
        .....
    }

}

在聚合过程函数中,我将列表刷新到状态,如果我需要保存到数据库,我将清除状态并在状态中保存标志以指示它。

但这对我来说似乎是弯曲的。我不确定这是否对我有用。

这种情况有更好的解决方案吗?

标签: apache-flinkflink-streaming

解决方案


在丰富的函数中使用状态。继续uid在您的状态和窗口触发刷新值时添加。官方文档中的这个页面有一个例子。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-keyed-state

对于您的情况, aListState会很好地工作。

编辑:

上述解决方案适用于非窗口情况。对于窗口情况,只需使用具有丰富窗口功能的应用功能的聚合


推荐阅读