首页 > 解决方案 > 带有 tumblingWindow 的 APACHE FLINK AggregateFunction 来计算事件,但如果没有事件发生,也发送 0

问题描述

我需要计算翻滚窗口内的事件。但如果窗口内没有事件,我也想发送值为 0 的事件。

就像是。

  1. 窗口计数:5
  2. 窗口计数:0
  3. 窗口计数:0
  4. 窗口计数:3
  5. 窗口计数:0 ...
import com.google.protobuf.Message;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.skydivin4ng3l.cepmodemon.models.events.aggregate.AggregateOuterClass;

public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(T event, Long accumulator) {
        return accumulator + 1L;
    }

    @Override
    public AggregateOuterClass.Aggregate getResult(Long accumulator) {
        return AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
    }

    @Override
    public Long merge(Long accumulator1, Long accumulator2) {
        return accumulator1 + accumulator2;
    }
}

并在这里使用

DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new BasicCounter<MonitorOuterClass.Monitor>());

TimeCharacteristics 是摄取时间

我读到了一个 TiggerFunction,它可能会检测聚合流是否在 x 时间后收到了一个事件,但我不确定这是否是正确的方法。

我预计聚合会发生,即使窗口内根本没有事件。也许有一个我不知道的设置?

感谢任何提示。

标签: apache-flinkflink-streaming

解决方案


我按照@David-Anderson 的建议选择了选项 1:

这是我的事件生成器:

public class EmptyEventSource implements SourceFunction<MonitorOuterClass.Monitor> {

    private volatile boolean isRunning = true;

    private final long delayPerRecordMillis;

    public EmptyEventSource(long delayPerRecordMillis){
        this.delayPerRecordMillis = delayPerRecordMillis;
    }

    @Override
    public void run(SourceContext<MonitorOuterClass.Monitor> sourceContext) throws Exception {
        while (isRunning) {
            sourceContext.collect(MonitorOuterClass.Monitor.newBuilder().build());

            if (delayPerRecordMillis > 0) {
                Thread.sleep(delayPerRecordMillis);
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

和我调整后的 AggregateFunction:

public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(T event, Long accumulator) {
        if(((MonitorOuterClass.Monitor)event).equals(MonitorOuterClass.Monitor.newBuilder().build())) {
            return accumulator;
        }

        return accumulator + 1L;
    }

    @Override
    public AggregateOuterClass.Aggregate getResult(Long accumulator) {
        AggregateOuterClass.Aggregate newAggregate = AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
        return newAggregate;
    }

    @Override
    public Long merge(Long accumulator1, Long accumulator2) {
        return accumulator1 + accumulator2;
    }
}

像这样使用它们:

DataStream<MonitorOuterClass.Monitor> someEntryStream = env.addSource(currentConsumer);
DataStream<MonitorOuterClass.Monitor> triggerStream = env.addSource(new EmptyEventSource(delayPerRecordMillis));
DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
                        .union(triggerStream)
                        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                        .aggregate(new BasicCounter<MonitorOuterClass.Monitor>());

推荐阅读