google-bigquery - 读取 Avro 文件并将其写入 BigQuery 表
问题描述
我的目标是从云存储中读取 avro 文件数据并使用 Java 将其写入 BigQuery 表。如果有人提供代码片段/想法来读取 avro 格式数据并使用 Cloud Dataflow 将其写入 BigQuery 表,那就太好了。
解决方案
我看到两种可能的方法:
- 使用数据流:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
// Read an AVRO file.
// Alternatively, read the schema from a file.
// https://beam.apache.org/releases/javadoc/2.11.0/index.html?org/apache/beam/sdk/io/AvroIO.html
Schema avroSchema = new Schema.Parser().parse(
"{\"type\": \"record\", "
+ "\"name\": \"quote\", "
+ "\"fields\": ["
+ "{\"name\": \"source\", \"type\": \"string\"},"
+ "{\"name\": \"quote\", \"type\": \"string\"}"
+ "]}");
PCollection<GenericRecord> avroRecords = p.apply(
AvroIO.readGenericRecords(avroSchema).from("gs://bucket/quotes.avro"));
// Convert Avro GenericRecords to BigQuery TableRows.
// It's probably better to use Avro-generated classes instead of manually casting types.
// https://beam.apache.org/documentation/io/built-in/google-bigquery/#writing-to-bigquery
PCollection<TableRow> bigQueryRows = avroRecords.apply(
MapElements.into(TypeDescriptor.of(TableRow.class))
.via(
(GenericRecord elem) ->
new TableRow()
.set("source", ((Utf8) elem.get("source")).toString())
.set("quote", ((Utf8) elem.get("quote")).toString())));
// https://cloud.google.com/bigquery/docs/schemas
TableSchema bigQuerySchema =
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("source")
.setType("STRING"),
new TableFieldSchema()
.setName("quote")
.setType("STRING")));
bigQueryRows.apply(BigQueryIO.writeTableRows()
.to(new TableReference()
.setProjectId("project_id")
.setDatasetId("dataset_id")
.setTableId("avro_source"))
.withSchema(bigQuerySchema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
p.run().waitUntilFinish();
- 直接将数据导入 BigQuery,无需 Dataflow。请参阅此文档:https ://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro
推荐阅读
- python - Crude virtual shop project fails when selection item from dictionary
- ios - 我可以在运行时在 Realm 中创建动态类吗?
- vba - Excel vba 捕获分组大纲级别(+,-)按钮按下
- css - 更改嵌套 div 内鼠标悬停时图层的颜色
- list - How to filter in Prolog with SWISH
- angular - 如何覆盖 cdk-mouse-focused 和 cdk-focused
- python - 在 Python 列中对对应的行值进行分组
- android - Facebook 广告仅在从 Play 商店下载的应用中显示
- angular - angular4多文件上传第二组文件清空
- java - 可重入锁的表现在哪里?