google-bigquery - 如何在 Apache Beam 中定期从 BigQuery 读取数据?
问题描述
我想在 Beam 中定期从 Bigquery 读取数据,测试代码如下
pipeline.apply("Generate Sequence",
GenerateSequence.from(0).withRate(1, Duration.standardMinutes(2)))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
.apply("Read from BQ", new ReadBQ())
.apply("Convert Row",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow))
.apply("Map TableRow", ParDo.of(new MapTableRowV1()))
;
static class ReadBQ extends PTransform<PCollection<Long>, PCollection<TableRow>> {
@Override
public PCollection<TableRow> expand(PCollection<Long> input) {
BigQueryIO.TypedRead<TableRow> rows = BigQueryIO.readTableRows()
.fromQuery("select * from project.dataset.table limit 10")
.usingStandardSql();
return rows.expand(input.getPipeline().begin());
}
}
static class MapTableRowV1 extends DoFn<AdUnitECPM, Void> {
@ProcessElement
public void processElement(ProcessContext pc) {
LOG.info("String of mydata is " + pc.element().toString());
}
}
既然BigQueryIO.TypedRead
是相关的PBegin
,一招就是在ReadBQ
through中完成的rows.expand(input.getPipeline().begin())
。但是,此作业不会每两分钟运行一次。如何定期从 bigquery 中读取数据?
解决方案
推荐阅读
- python - 派生查询上的 Django 内部联接
- xamarin - Android 应用与为 x86 构建的 Android SDK 不兼容
- python - 表的一个字段,在 Django Tables 中为另一个字段所独有
- kdb - 列的条件更新
- ios - 如何使用 Swift 在 iOS 中以编程方式在显示和亮度中打开 NightShift
- java - 如何在构造函数不带参数的对象上制作有效的模拟套件?
- c# - Unity翻转纹理使用c#
- ios - 谷歌地图 - reverseGeocodeCoordinate 不会在新加坡打印地址
- excel - 如何根据单元格中的日期执行If语句,Excel VBA
- pandas - pandas 将 pandas (0.13.1) 更新到可用的最新版本