首页 > 解决方案 > 如何在 Apache Beam 中定期从 BigQuery 读取数据?

问题描述

我想在 Beam 中定期从 Bigquery 读取数据,测试代码如下

pipeline.apply("Generate Sequence",
            GenerateSequence.from(0).withRate(1, Duration.standardMinutes(2)))
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
    .apply("Read from BQ", new ReadBQ())
    .apply("Convert Row",
            MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow))
    .apply("Map TableRow", ParDo.of(new MapTableRowV1()))
    ;


static class ReadBQ extends PTransform<PCollection<Long>, PCollection<TableRow>> {
    @Override
    public PCollection<TableRow> expand(PCollection<Long> input) {
        BigQueryIO.TypedRead<TableRow> rows = BigQueryIO.readTableRows()
            .fromQuery("select * from project.dataset.table limit 10")
            .usingStandardSql();
        return rows.expand(input.getPipeline().begin()); 
    }
}

static class MapTableRowV1 extends DoFn<AdUnitECPM, Void> {
    @ProcessElement
    public void processElement(ProcessContext pc) {
        LOG.info("String of mydata is " + pc.element().toString());
    }
}

在此处输入图像描述

既然BigQueryIO.TypedRead是相关的PBegin,一招就是在ReadBQthrough中完成的rows.expand(input.getPipeline().begin())。但是,此作业不会每两分钟运行一次。如何定期从 bigquery 中读取数据?

标签: google-bigquerygoogle-cloud-dataflowapache-beam

解决方案


正如您在问题中提到的,BigQueryIO 读取转换以 PBegin 开头,它将它放在图表的开头。为了实现您的目标,您需要直接在 DoFn 中使用 BigQuery 客户端库。

举个例子,看看这个 变换

对于少量数据,为此使用普通的 DoFn 是可以的,但对于大量数据,您将需要考虑在SDF中实现该逻辑。


推荐阅读