apache-beam - 是否可以将复合触发器与带有数据流的微批处理结合使用?
问题描述
我们有一个无限PCollection
PCollection<TableRow> source
的我们要插入到 BigQuery。
每 500,000 条消息或 5 分钟触发一次窗口的简单“按书本”方法是:
source.apply("GlobalWindow", Window.<TableRow>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(500000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5))))
).withAllowedLateness(Duration.standardMinutes(1440)).discardingFiredPanes())
您会认为将以下内容应用于触发的窗口/窗格将允许您将触发的窗格的内容写入 BigQuery:
.apply("BatchWriteToBigQuery", BigQueryIO.writeTableRows()
.to(destination)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withNumFileShards(NUM_FILE_SHARDS)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
但这会产生编译错误An exception occured while executing the Java class. When writing an unbounded PCollection via FILE_LOADS, triggering frequency must be specified
相对简单的解决方法是添加.withTriggeringFrequency(Duration.standardMinutes(5))
到上述内容中,这基本上会使每五分钟或每 N 条消息插入的想法完全无效,并且在这种情况下你也可以摆脱窗口。
有没有办法真正做到这一点?
解决方案
FILE_LOADS
需要触发频率。
如果您想要更多实时结果,那么您可以使用STREAMING_INSERTS
推荐阅读
- ruby-on-rails-4 - 如何从活动管理员中的关联中选择所有记录
- c - 访问 MPI 拓扑中的邻居进程值
- android - 如何在 RecyclerView 适配器中隐藏页脚?
- python - 将变量从一个 python 脚本实时传递到另一个
- java - 将字节写入流时损坏的 pdf 文件
- laravel - Laravel 默认身份验证与令牌身份验证
- kotlin - 为什么要将 val 或 var 放在 kotlin 类构造函数中
- react-native - React Native:点击 FlatList 项目时呈现项目细节的逻辑
- google-apps-script - 我想使用 Google Apps 脚本删除数组中的所有空格
- python - 尝试连接两个模型并适合 Keras 时出现 AssertionError