首页 > 解决方案 > Apache Beam/Java,如何设置每个窗口只发送一次数据的窗口/触发器

问题描述

我有一个管道如下:

Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))
      .triggering(
        AfterWatermark.pastEndOfWindow()
          .withEarlyFirings(AfterProcessingTime
            .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(options.getWindowDuration()))))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes();

PCollectionTuple productProcessorPT = pipeline
  .apply(READ_PRODUCT_FROM_PUBSUB.getName(), PubsubIO.readStrings()
    .fromSubscription(options.getProductSubscription()))
  .apply(PRODUCT_WINDOW.getName(), fixedWindow)
  .apply(PROCESS_PRODUCT.getName(), ParDo.of(new ProductProcessor()))
  .apply(GROUP_PRODUCT_DATA.getName(), GroupByKey.create())
  .apply(COMBINE_PRODUCT_DATA.getName(), ParDo.of(new ProductCombiner())
    .withOutputTags(KV_STRING_OBJECTNODE, TupleTagList.of(PIPELINE_ERROR)));

我想要实现的是设置一个窗口/触发器,每 60 秒收集一次数据,然后将数据发送到下一个转换。我怎样才能做到这一点?我不在乎事件时间戳。

上面的代码每 60 秒发送一次数据到下一次转换,但即使没有新数据进入管道,它也会继续触发/发送(相同的)数据。不知道为什么会这样?

标签: javaapache-beam

解决方案


您可以删除触发,并使用FixedWindows如下方式每 60 秒发出一次记录

Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())));

这将使用延迟事件的默认触发和处理,这基本上意味着数据在窗口结束时发出,并且所有延迟事件都被忽略。


推荐阅读