google-cloud-platform - 默认窗口和默认触发器如何在 Apache Beam 中工作
问题描述
我正在尝试使用默认触发器实现默认窗口以评估行为,但它没有发出任何结果。
根据 Apache Beam:
PCollection 的默认触发器基于事件时间,并在 Beam 的水印通过窗口末尾时发出窗口结果,然后在每次延迟数据到达时触发。
如果您同时使用默认窗口配置和默认触发器,则默认触发器仅发出一次,并且延迟数据将被丢弃。这是因为默认窗口配置的允许延迟值为 0。
我的代码:
Nb_items = lines | beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults() \
| 'print' >> beam.ParDo(PrintFn())
如果我设置触发器,它只会发出数据
Nb_items = lines | 'window' >> beam.WindowInto(window.GlobalWindows(),
trigger=trigger.AfterProcessingTime(10),
accumulation_mode=trigger.AccumulationMode.DISCARDING) \
| 'CountGlobally' >> beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults() \
| 'print' >> beam.ParDo(PrintFn())
如何在不设置触发器的情况下观察默认行为?
是组合变换中的问题吗?
如果您的输入 PCollection 使用默认的全局窗口,则默认行为是返回包含一项的 PCollection。该项目的值来自您在应用组合时指定的组合函数中的累加器
解决方案
当前的问题是水印永远不会到达GlobalWindow
. 要拥有默认触发器,您可以使用水印可以到达末尾的任何其他窗口,例如:'window' >> beam.WindowInto(window.FixedWindows(10))
正如 Guillaume 正确地问的那样,如果您在 Batch 上运行,触发器基本上会被忽略。
推荐阅读
- java - 为什么这不起作用。两个 for 循环中不能有 if 语句吗?
- java - Java序列化,如何为每个对象识别和命名一个文件?
- java - Shuhart stepview 空对象引用
- powershell - Powershell拆分未返回预期结果
- python - Pandas:按两个类别分组并返回计数
- angular - 需要一个 SOAP 服务器才能在 Angular 中运行
- opengl - 如何使用 OpenGL Shader 模拟“Glow Dodge”混合?
- python - 在 python 中导出具有“类型”和“大小”列的字典
- powerbi - Power BI 中的 DAX - 如何添加度量来计算百分比
- django - 在没有复制粘贴到控制台的情况下对 Django 授权进行 Spotipy