首页 > 解决方案 > 从 Java 在 BiqQuery 中创建数据加载表

问题描述

根据此处的文档:https : //cloud.google.com/bigquery/docs/tables#creating_a_table_when_you_load_data BigQuery 应该可以从数据创建表。

将数据加载到 BigQuery 时,您可以将数据加载到新表或分区中,可以将数据附加到现有表或分区中,也可以覆盖表或分区。在将数据加载到其中之前,您不需要创建一个空表。您可以同时创建新表并加载数据。

但是,每当我尝试将数据从 Java 流式传输到 BigQuery 时,我都会收到一个错误,即我的表不存在。

这是一个插入语句的示例,但仅在我手动创建表之后才起作用:

InsertAllResponse response = bigQuery
        .insertAll(
                InsertAllRequest
                        .newBuilder(tableId)
                        .addRow(rowContent)
                        .build()
        );

我可以在 Java 中创建架构,然后创建表,但是我必须不断检查架构是否已创建,然后才能流式传输到它。generateBigQuerySchema是我创建的定义架构的方法。如果架构已经存在,下面的代码将失败,所以我必须在创建它之前检查它是否存在。

InsertAllResponse response = bigQuery
        .create(requestLog.generateBigQuerySchema(tableId))
        .getBigQuery()
        .insertAll(
                InsertAllRequest
                        .newBuilder(tableId)
                        .addRow(rowContent)
                        .build()
        );

标签: javagoogle-bigquery

解决方案


我认为您根据API Reference混合了两种不同的资源类型。我的意思是JobsTabledata

乔布斯确实从 Tabledata 中加载 insertAll 方法,但不会

一次将数据流式传输到 BigQuery 一条记录,而无需运行加载作业

我看到 Google 文档可能会像上面那样被误解,因为将数据加载到 BigQuery 中的介绍引用了流式插入 (insertAll)。如下所示:

您可以加载数据:

...通过使用流式插入插入单个记录...

流式插入重定向到 BigQuery 中的流式数据的地方,它讲述了流式传输而不是加载:

您可以选择使用 tabledata().insertAll() 方法一次将数据流式传输到 BigQuery 中,而不是使用作业将数据加载到 BigQuery 中。

关于流式插入 (insertAll)的最后一件事:

确保您对包含目标表的数据集具有写入权限。除非您使用模板表,否则该表必须在您开始向其写入数据之前存在。有关模板表的更多信息,请参阅使用模板表自动创建表。

如果您仍想同时加载而不是使用模板表进行流式处理并创建表,请使用作业并加载作业类型(或其他类型,如果需要)

我的问题中的示例代码:

Insert insert = bigquery.jobs().insert(projectId,
                   new Job().setConfiguration(
                            new JobConfiguration().setLoad(
                                   new JobConfigurationLoad()
                                                .setSourceFormat("NEWLINE_DELIMITED_JSON")
                                                .setDestinationTable(
                                                        new TableReference()
                                                                .setProjectId(projectId)
                                                                .setDatasetId(dataSetId)
                                                                .setTableId(tableId)
                                                )
                                                .setCreateDisposition("CREATE_IF_NEEDED")
                                                .setWriteDisposition(writeDisposition)
                                                .setSourceUris(Collections.singletonList(sourceUri))
                                                .setAutodetect(true)
                                )
                        ));

Job myInsertJob = insert.execute();

推荐阅读