首页 > 解决方案 > 滑动时间窗口的 F​​link 性能问题

问题描述

我正在尝试使用一些网络监视器工作来进行 flink。我的目标是计算dst_ip每个src_ip.

我的以下代码有效,但性能非常糟糕。似乎每个滑动窗口都会重新计算所有事件,但这不是必需的。

例如,我们有事件发生在时间 1 - 600 秒。Flink 可以获得每秒的累加器,所以我们每秒有 600 个累加器。当第一个滑动窗口到期时,flink 只合并 1-300 的累加器,并销毁第二个 1 的累加器。这个窗口也可以在最后一秒之前预先合并 1-299。当第二个滑动窗口到期时,flink 只是合并 2-301 的累加器,并销毁第二个 2 的累加器。以此类推......

这种方式比将事件分配给多个窗口并计算每个窗口的聚合要高效得多。

flink 支持这个吗?我可以通过 flink 自己获得类似的功能吗?

非常感谢!

public static class AverageAccumulator2 {
    String key;
    Set<String> target;
    AverageAccumulator2() {
        target = new HashSet<>();
    }
}

public static class Average2 implements AggregateFunction<ObjectNode, AverageAccumulator2, Tuple3<String, Long, Set<String>>> {
    @Override
    public AverageAccumulator2 createAccumulator() {
        return new AverageAccumulator2();
    }

    @Override
    public AverageAccumulator2 add(ObjectNode value, AverageAccumulator2 accumulator) {
        accumulator.key = value.get("value").get("src_ip").asText();
        accumulator.target.add(value.get("value").get("dst_ip").asText());
        return accumulator;
    }
    @Override
    public Tuple3<String, Long, Set<String>> getResult(AverageAccumulator2 accumulator) {
        return new Tuple3<>(accumulator.key, (long) accumulator.target.size(), accumulator.target);
    }

    @Override
    public AverageAccumulator2 merge(AverageAccumulator2 a, AverageAccumulator2 b) {
        a.target.addAll(b.target);
        return a;
    }
}

final SingleOutputStreamOperator<Tuple3<String, Long, Set<String>> > process2 =
stream.keyBy(value -> value.get("value").get("sip").asText())
                    .timeWindow(Time.seconds(300),Time.seconds(1))
                    .aggregate(new Average2());

标签: apache-flink

解决方案


正如您所观察到的,Flink 不会尝试优化滑动窗口。对于细粒度的滑动,这确实变得非常昂贵。

您可以做的是使用ProcessFunction实现您自己的用于处理状态和计时器的逻辑——您可以按照您的概述来实现它。您将拥有一个 processElement 方法,用于为每个传入记录更新您将用于累积结果的数据结构,以及一个 onTimer 方法,该方法每秒触发一次,将部分结果合并在一起,并将结果发送到下游。


推荐阅读