首页 > 解决方案 > 使用 Dataflow 更新特定 BQ 分区或数据范围

问题描述

您好,我正在努力寻找是否可以这样做。我有一个从存储加载数据并应用一些转换的管道,转换结束后,我在 BigQuery 上上传该数据,但我总是需要处理我拥有的全部数据,但是这个过程加班开始有点辣GCP 计费。

我试图找出某种方式,我只能截断某些分区或删除某些数据范围以插入新的数据。

我创建了这个伪代码,我实际上做了什么。


//Create My Pipeline
Pipeline  p  = ....;

//Do some transformations
PCollection<SomeModel> processed_data = p.apply(ParDo....);


//Format the results to Write on BQ
PCollection<TableRow> rows = processed_data.apply("Format to BQ",Map...);

//Save on Bq with partion by Date
rows.apply("Save it",
BigQueryIO.writeTableRows().to(tableName).withSchema(TableSchema)
        .withTimePartitioning(new TimePartitioning().setField("date"))
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

p.run();

最后,我使用 BigQueryIO 写入 Bigquery,但我总是需要截断表

Obs:至少我需要每天重新处理最后 30 天,所以我不能每天增加数据。

Ob2:我已经搜索了很多,但找不到本地方法,我看到人们建议将数据加载到表中并创建计划作业以将其导入生产表或某种类型,或在 DoFn 调用中BigQuery API 在启动生产流水线之前删除数据。

标签: javagoogle-cloud-platformgoogle-bigquerygoogle-cloud-dataflowapache-beam

解决方案


推荐阅读