首页 > 解决方案 > Apache Flink 减少了许多值而不是一个值

问题描述

我正在尝试在 WindowedStream 上实现减少,如下所示:

                .keyBy(t -> t.key)
            .timeWindow(Time.of(15, MINUTES), Time.of(1, MINUTES))
            .reduce(new ReduceFunction<TwitterSentiments>() {
                @Override
                public TwitterSentiments reduce(TwitterSentiments t2, TwitterSentiments t1) throws Exception {
                    t2.positive += t1.positive;
                    t2.neutral += t1.neutral;
                    t2.negative += t1.negative;

                    return t2;
                }
            });

我遇到的问题是,当我调用 stream.print() 时,我得到了许多值(看起来像每个 TwitterSentiments 对象一个,而不是单个聚合对象。

我也尝试过使用这样的 AggregationFunction ,但存在同样的问题:

                .aggregate(new AggregateFunction<TwitterSentiments, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
                @Override
                public Tuple3<Long, Long, Long> createAccumulator() {
                    return new Tuple3<Long, Long, Long>(0L,0L,0L);
                }

                @Override
                public Tuple3<Long, Long, Long> add(TwitterSentiments ts, Tuple3<Long, Long, Long> accumulator) {
                    return new Tuple3<Long, Long, Long>(
                            accumulator.f0 + ts.positive.longValue(),
                            accumulator.f1 + ts.neutral.longValue(),
                            accumulator.f2 + ts.negative.longValue()
                    );
                }

                @Override
                public Tuple3<Long, Long, Long> getResult(Tuple3<Long, Long, Long> accumulator) {
                    return accumulator;
                }

                @Override
                public Tuple3<Long, Long, Long> merge(Tuple3<Long, Long, Long> accumulator1, Tuple3<Long, Long, Long> accumulator2) {
                    return new Tuple3<Long, Long, Long>(
                            accumulator1.f0 + accumulator2.f0,
                            accumulator1.f1 + accumulator2.f1,
                            accumulator1.f2 + accumulator2.f1);
                }
            });

stream.print() 在这些聚合之后仍然会输出许多记录的原因是什么?

标签: javabigdataapache-flinkflink-streaming

解决方案


如果您不需要每个键的结果,则可以使用 timeWindowAll 生成单个结果。但是,timeWindowAll 不会并行运行。如果您想以更具可扩展性的方式计算结果,您可以这样做:

    .keyBy(t -> t.key)
    .timeWindow(<time specification>)
    .reduce(<reduce function>)
    .timeWindowAll(<same time specification>)
    .reduce(<same reduce function>)

您可能希望 Flink 的运行时足够智能,可以为您执行此并行预聚合(假设您使用的是 ReduceFunction 或 AggregateFunction),但事实并非如此。


推荐阅读