java - 使用 Dataflow 更新特定 BQ 分区或数据范围
问题描述
您好,我正在努力寻找是否可以这样做。我有一个从存储加载数据并应用一些转换的管道,转换结束后,我在 BigQuery 上上传该数据,但我总是需要处理我拥有的全部数据,但是这个过程加班开始有点辣GCP 计费。
我试图找出某种方式,我只能截断某些分区或删除某些数据范围以插入新的数据。
我创建了这个伪代码,我实际上做了什么。
//Create My Pipeline
Pipeline p = ....;
//Do some transformations
PCollection<SomeModel> processed_data = p.apply(ParDo....);
//Format the results to Write on BQ
PCollection<TableRow> rows = processed_data.apply("Format to BQ",Map...);
//Save on Bq with partion by Date
rows.apply("Save it",
BigQueryIO.writeTableRows().to(tableName).withSchema(TableSchema)
.withTimePartitioning(new TimePartitioning().setField("date"))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run();
最后,我使用 BigQueryIO 写入 Bigquery,但我总是需要截断表
Obs:至少我需要每天重新处理最后 30 天,所以我不能每天增加数据。
Ob2:我已经搜索了很多,但找不到本地方法,我看到人们建议将数据加载到表中并创建计划作业以将其导入生产表或某种类型,或在 DoFn 调用中BigQuery API 在启动生产流水线之前删除数据。
解决方案
推荐阅读
- android - 材料自动完成文本视图下拉菜单未正确显示取决于另一个自动完成文本视图选择
- java - keylistener 没有响应任何键?
- c++ - 从函数范围内的参数包访问特定参数,如何?
- javascript - 将对象数组转换为单个数组 ReactJS
- parsing - 使用 Lex 和 Yacc 的矩阵解析器
- java - 不能从另一个类调用静态方法到主类
- python - 使用 quicksight boto3 update_data_source
- ssl - 如何验证 Wildfly Web 服务器是否使用添加的密钥库文件进行 https 连接?
- c++ - 为什么我的或操作员没有按预期工作?
- reactjs - 如何将子组件的密钥传递给父组件