apache-flink - 带有 tumblingWindow 的 APACHE FLINK AggregateFunction 来计算事件,但如果没有事件发生,也发送 0
问题描述
我需要计算翻滚窗口内的事件。但如果窗口内没有事件,我也想发送值为 0 的事件。
就像是。
- 窗口计数:5
- 窗口计数:0
- 窗口计数:0
- 窗口计数:3
- 窗口计数:0 ...
import com.google.protobuf.Message;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.skydivin4ng3l.cepmodemon.models.events.aggregate.AggregateOuterClass;
public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(T event, Long accumulator) {
return accumulator + 1L;
}
@Override
public AggregateOuterClass.Aggregate getResult(Long accumulator) {
return AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
}
@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}
并在这里使用
DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new BasicCounter<MonitorOuterClass.Monitor>());
TimeCharacteristics 是摄取时间
我读到了一个 TiggerFunction,它可能会检测聚合流是否在 x 时间后收到了一个事件,但我不确定这是否是正确的方法。
我预计聚合会发生,即使窗口内根本没有事件。也许有一个我不知道的设置?
感谢任何提示。
解决方案
我按照@David-Anderson 的建议选择了选项 1:
这是我的事件生成器:
public class EmptyEventSource implements SourceFunction<MonitorOuterClass.Monitor> {
private volatile boolean isRunning = true;
private final long delayPerRecordMillis;
public EmptyEventSource(long delayPerRecordMillis){
this.delayPerRecordMillis = delayPerRecordMillis;
}
@Override
public void run(SourceContext<MonitorOuterClass.Monitor> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(MonitorOuterClass.Monitor.newBuilder().build());
if (delayPerRecordMillis > 0) {
Thread.sleep(delayPerRecordMillis);
}
}
}
@Override
public void cancel() {
isRunning = false;
}
}
和我调整后的 AggregateFunction:
public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(T event, Long accumulator) {
if(((MonitorOuterClass.Monitor)event).equals(MonitorOuterClass.Monitor.newBuilder().build())) {
return accumulator;
}
return accumulator + 1L;
}
@Override
public AggregateOuterClass.Aggregate getResult(Long accumulator) {
AggregateOuterClass.Aggregate newAggregate = AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
return newAggregate;
}
@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}
像这样使用它们:
DataStream<MonitorOuterClass.Monitor> someEntryStream = env.addSource(currentConsumer);
DataStream<MonitorOuterClass.Monitor> triggerStream = env.addSource(new EmptyEventSource(delayPerRecordMillis));
DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
.union(triggerStream)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new BasicCounter<MonitorOuterClass.Monitor>());
推荐阅读
- java - 使用 Java 通过 API 查询 Azure SQL DB
- swift - 引用泛型 '
' 需要 <...> 中的参数 - docker - Nginx 容器不会将文件复制到我的主机
- javabeans - 将 Typesafe 配置映射到 bean 类 (ConfigBeanFactory) 时,有没有办法标记拼写错误的属性?
- javascript - JavaScript 在填充“选择”之前按字母顺序(使用巴西葡萄牙语重音词)获取和排序
- xslt - XSLT - 对 XML 元素的子集进行排序,同时将未排序的元素保留在其位置
- python - 从具有基于其他数据框列的多索引的数据框中选择行
- python - Python Multiprocessing:如何正确设置 max_workers 的数量?
- python - 使用 keras 进行 RNN 时如何分配种子编号?
- javascript - React - 动画数组中的第一个新元素