首页 > 解决方案 > Flink 滑动计数窗口行为

问题描述

假设我们有这样的数据结构:

Tuple2<ArryaList<Long>, Integer>

第一个字段是一个ArrayList长度为 one 的包含时间戳的字段,而 Integer 字段是一个介于 1 和 40 之间的数字,名为channel。目标是使用相同的键 ( channel) 聚合每 400 条消息并ReduceFunction对其应用(它只是将 400 条消息的时间戳合并到元组的第一个字段中)。我将该channel字段设置为消息的键并创建一个 400 的计数窗口。例如,如果我们有160000条消息作为输入,它应该输出160000/400 = 400行并且计数窗口按需要工作。问题是当我使用滑动计数窗口时,我的预期行为是:

Flink 为每个channel数字创建逻辑窗口并ReduceFunction 第一次应用,如果逻辑窗口的长度达到 400,之后每 100 个输入数据,使用与逻辑窗口的 key 相同的 key,将调用ReduceFunctionfor last 400 消息窗口也是如此,所以我们应该有:

但是运行滑动计数窗口,它会输出 1600 行长度可变的行。(我预计输出长度仅为 400)

要点:长度我是指ArrayList的大小(Tuple2的第一个字段)

我怎样才能证明这种行为并实现我想要的滑动计数窗口?

这是源代码:

DataStream<Tuple2<ArrayList<Long>, Integer>> data ;
data.keyBy(1).countWindow(400, 100)
                 .reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() {
             @Override
             public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
                 t0.f0.add(t1.f0.get(0));
                 return t0;
             }
         }).writeAsText("results400").setParallelism(1);

更新:根据@DavidAnderson 的建议,我也尝试在ReduceFunstion而不是修改中创建一个新的元组t0,但它产生了相同的输出。

public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
                         ArrayList<Long> times = t0.f0;

                         times.addAll(t1.f0);

                         return new Tuple2<>(times, t0.f1) ;
                     }

标签: apache-flinkflink-streaming

解决方案


这是countWindow的实现

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
    return window(GlobalWindows.create())
            .evictor(CountEvictor.of(size))
            .trigger(CountTrigger.of(slide));
}

它的行为与您期望的不太一样。窗口每 100 个元素(幻灯片)触发一次,无论它是否包含 400 个元素(大小)。大小控制最多保留多少个元素。


推荐阅读