首页 > 解决方案 > 是否可以将复合触发器与带有数据流的微批处理结合使用?

问题描述

我们有一个无限PCollection PCollection<TableRow> source的我们要插入到 BigQuery。

每 500,000 条消息或 5 分钟触发一次窗口的简单“按书本”方法是:

source.apply("GlobalWindow", Window.<TableRow>into(new GlobalWindows())
    .triggering(Repeatedly.forever(AfterFirst.of(
         AfterPane.elementCountAtLeast(500000),
         AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5))))
     ).withAllowedLateness(Duration.standardMinutes(1440)).discardingFiredPanes())

您会认为将以下内容应用于触发的窗口/窗格将允许您将触发的窗格的内容写入 BigQuery:

.apply("BatchWriteToBigQuery", BigQueryIO.writeTableRows()
.to(destination)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withNumFileShards(NUM_FILE_SHARDS)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

但这会产生编译错误An exception occured while executing the Java class. When writing an unbounded PCollection via FILE_LOADS, triggering frequency must be specified

相对简单的解决方法是添加.withTriggeringFrequency(Duration.standardMinutes(5))到上述内容中,这基本上会使每五分钟或每 N 条消息插入的想法完全无效并且在这种情况下你也可以摆脱窗口。

有没有办法真正做到这一点?

标签: apache-beamdataflow

解决方案


FILE_LOADS需要触发频率。

如果您想要更多实时结果,那么您可以使用STREAMING_INSERTS

参考https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.Method.html#FILE_LOADS


推荐阅读