google-cloud-dataflow - 为 10 分钟窗口指定正确触发 + 5 分钟延迟缓冲区仅产生 1 个结果
问题描述
我正在创建一个管道,它摄取无界数据源并进行聚合计算。计算在 10 分钟窗口内完成,基于事件时间和 5 分钟缓冲区用于迟到事件。我想让聚合结果在 10 分钟窗口和 5 分钟缓冲区过去后只发出一次。
我不知道如何使窗口只发出一次结果。我相信正确的方法是使用AfterWatermark
触发器,但如果我使用withLateFirings()
,结果将在窗口过去后和后期触发持续时间过去后发出两次。如果不使用延迟触发,延迟事件将不会包含在计算中,这不符合我的要求。
public class WindowFactory {
private static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
public static Window<Message> getMessageFixedWindow(Duration duration) {
return Window.<Message>into(FixedWindows.of(duration))
.triggering(
AfterWatermark
.pastEndOfWindow()
.withLateFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES)))
.discardingFiredPanes()
.withAllowedLateness(FIVE_MINUTES);
}
}
请建议我在 10 分钟窗口和 5 分钟缓冲后仅产生 1 个结果的好方法。
解决方案
您现在设置的内容将触发两次,一次是在水印超过窗口末尾时,一次是在延迟数据缓冲区窗口关闭时。
仅使用触发器无法禁用窗口末尾的第一次触发。但是,您可以检测到您正在看到第一次射击并忽略它。通过检查Pane.IsLast()。
@ProcessElement
public void processElement(ProcessContext c) {
if (!c.pane().isLast()) {
return;
}
}
对于没有延迟数据的情况,您不能让系统在窗口结束时触发。系统不知道延迟数据是否会在此时到达。不过,我不认为你是专门问这个的,我只是想提一下。
推荐阅读
- python - 在 Python 中查看辅助 GET 请求?
- reactjs - 反应媒体查询不会改变网格列
- tabs - 添加内容安全标头后,Microsoft Teams Bot 选项卡未加载
- javascript - 检查数组的每个对象中是否存在键值对
- python - 在 django 中显示关于模型的数据库中的表名
- python - Python:使用 Selenium 的 xpaths 中的正则表达式
- actionscript-3 - AS3:ScrollRect 不从两点移动而不跳帧内容
- amazon-web-services - Istio 服务网格设置中的 gRPC GOAWAY 错误
- python - 使用变量名重命名 SQLite 中的表
- python - 如何在使用 google colab 时修复“RuntimeError:无法打开 shape_predictor_68_face_landmarks.dat”?