java - Apache Beam/Java,如何设置每个窗口只发送一次数据的窗口/触发器
问题描述
我有一个管道如下:
Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(options.getWindowDuration()))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes();
PCollectionTuple productProcessorPT = pipeline
.apply(READ_PRODUCT_FROM_PUBSUB.getName(), PubsubIO.readStrings()
.fromSubscription(options.getProductSubscription()))
.apply(PRODUCT_WINDOW.getName(), fixedWindow)
.apply(PROCESS_PRODUCT.getName(), ParDo.of(new ProductProcessor()))
.apply(GROUP_PRODUCT_DATA.getName(), GroupByKey.create())
.apply(COMBINE_PRODUCT_DATA.getName(), ParDo.of(new ProductCombiner())
.withOutputTags(KV_STRING_OBJECTNODE, TupleTagList.of(PIPELINE_ERROR)));
我想要实现的是设置一个窗口/触发器,每 60 秒收集一次数据,然后将数据发送到下一个转换。我怎样才能做到这一点?我不在乎事件时间戳。
上面的代码每 60 秒发送一次数据到下一次转换,但即使没有新数据进入管道,它也会继续触发/发送(相同的)数据。不知道为什么会这样?
解决方案
您可以删除触发,并使用FixedWindows
如下方式每 60 秒发出一次记录
Window<String> fixedWindow = Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())));
这将使用延迟事件的默认触发和处理,这基本上意味着数据在窗口结束时发出,并且所有延迟事件都被忽略。
推荐阅读
- python - 使用 XML ElementTree 从 XML(旧 Excel)文件中提取值
- python - 我想用 WIDF(加权逆文件频率)算法使用 python 代码预处理文本
- cv2 - 基于 arcLength 裁剪轮廓
- digital-ocean - 当 supervisorctl start worker 时服务器停止响应
- dialogflow-es - 如何使用 Dialogflow ES Fulfillment 触发向人类发送电子邮件
- asp.net-mvc - Linq 获取最新日期查询
- powershell - 无法从 PowerShell 触发 .Bat 文件
- java - 怎么给脸书发消息
- reactjs - 通过 List 调用未找到新插入的记录
- reactjs - 底部导航栏动画