java - Apache Beam - 在管道中添加延迟
问题描述
我有一个从 Pub Sub 主题读取并写入 BigQuery 的简单管道。我想在从主题中读取消息和将其写入 BQ 之间引入 5 分钟的延迟。
我想我可以使用触发器来做到这一点,类似于下面的这个,但是消息仍然直接通过,没有延迟。
PCollection<PubsubMessage> windowed_inputEvents =
inputEvents.apply(
Window.<PubsubMessage>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(5)))
.withAllowedLateness(Duration.standardMinutes(1))
.discardingFiredPanes());
是否可以使用触发器创建这样的延迟?
谢谢
解决方案
看起来你混淆了几件事。在您的示例中,您有一个 1 分钟的固定窗口,这意味着在窗口结束时,所有属于窗口一部分的数据元素都会被发出。
触发器基本上是额外的杠杆,您可以利用它们在窗口关闭之前发出数据。窗口期关闭后,触发器不能保存数据。例如,如果窗口在 12:00 和 12:01 之间,并且如果第一个元素在 12:00 出现,那么当窗口在 12:01 关闭时,元素被发射,直到 12 点才被阻止: 05.
为了满足您的要求,您可以做几件事:-
- 增加窗口期的大小,使其长于保留期,然后您可以延迟发出数据元素。
- 如果这在 BigqueryIO 中是不可能的,那么您可以利用 FILE_LOADS 方法将数据批量写入 Bigquery,并且此 API 也可以使用
withTriggeringFrequency
. 更多细节可以在这里找到 - https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda .time.Duration-
推荐阅读
- webpack - 仅在生产版本中对 Webpack terser 进行故障排除
- linux - 将命令序列传递给正在运行的容器
- amazon-web-services - AWS lambda 只读文件系统错误,使用 docker 镜像存储 ML 模型
- python - 截屏并使用枕头将其转换为灰度图像的最佳方法是什么?
- gstreamer - 将 uridecodebin 动态添加到合成器
- java - 如何使用 Jsoup 从 HTML 中获取信息?
- qt - QPropetyAnimation 在 mosemove 事件中未按预期工作
- python - 在 Python 中一次只能单击一个按钮
- salesforce - 如何将 Amazon Alexa 与 Salesforce Einstein Bot 集成
- machine-learning - 如何理解多类逻辑回归的 ROC?