首页 > 解决方案 > 数据流批量插入 BigQuery 无法在 asia-northeast1 区域中找到数据集

问题描述

我们有一个位于 asia-northeast1 区域的 BigQuery 数据集。我们正在尝试运行一个数据流作业,该作业从 pub/sub 读取事件并通过 BigQueryIO FILE_LOADS 方法写入 BigQuery。

当我们运行数据流以插入 BigQuery 数据集(位于美国地区)时,它可以工作,但是当我们运行数据流以插入 BigQuery 数据集(位于 asia-northeast1)时,它会给我们以下错误。

java.io.IOException: Unable to insert job: beam_load_starterpipelinechintan0sureliya0619062952b1babd6f_bc8eb81e4c0f48c5bc9fe8268596d9e5_2aa762b44d22c1bad4faa10e3e95f341_00001_00000-0, aborting after 9 .
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.startJob(BigQueryServicesImpl.java:232)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.startJob(BigQueryServicesImpl.java:203)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.startLoadJob(BigQueryServicesImpl.java:143)
        org.apache.beam.sdk.io.gcp.bigquery.WriteTables.load(WriteTables.java:262)
        org.apache.beam.sdk.io.gcp.bigquery.WriteTables.access$600(WriteTables.java:79)
        org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.processElement(WriteTables.java:157)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "Not found: Dataset datapipelinedev:event",
    "reason" : "notFound"
  } ],
  "message" : "Not found: Dataset datapipelinedev:event"
}
        com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.startJob(BigQueryServicesImpl.java:217)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.startJob(BigQueryServicesImpl.java:203)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.startLoadJob(BigQueryServicesImpl.java:143)
        org.apache.beam.sdk.io.gcp.bigquery.WriteTables.load(WriteTables.java:262)
        org.apache.beam.sdk.io.gcp.bigquery.WriteTables.access$600(WriteTables.java:79)
        org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.processElement(WriteTables.java:157)
        org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
        org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
        com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:60)
        com.google.cloud.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:300)
        com.google.cloud.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:226)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:35)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1211)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:959)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

我们之前尝试在位于 asia-northeast1 区域的 BigQuery 中进行流式插入,它可以工作,但我们需要使用批处理作业加载以避免 BigQuery 流式缓冲区。

我们在调试时发现了以下内容

  1. Dataflow 正在我们位于 asia-northeast1 区域的 GCS 临时位置写入表行
  2. Dataflow 正在获取这些临时文件并尝试插入到目标表中,但由于无法找到 BigQuery 数据集而无法执行此操作

我们试图找出是否有可能为 BigQueryIO.writeTableRows() 设置 BigQueryIO 区域,但没有找到。

下面是使用 BigQueryIO 动态目标 FILE_LOADS 方法插入 BigQuery 的一段代码。

windowed_items.apply("batch_insert_into_respective_table", new ReadEventJson_bigquery())
    .apply("Write_events_to_BQ",
            BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() {
                public String getDestination(ValueInSingleWindow<TableRow> element) {
                    String destination = EventSchemaBuilder
                            .fetch_destination_based_on_event(element.getValue().get("event").toString());
                    return destination;
                }

                @Override
                public TableDestination getTable(String table) {
                    String destination = EventSchemaBuilder.fetch_table_name_based_on_event(table);
                    return new TableDestination(destination, destination);
                }

                @Override
                public TableSchema getSchema(String table) {
                    TableSchema table_schema = EventSchemaBuilder.fetch_table_schema_based_on_event(table);
                    return table_schema;
                }
            }).withMethod(Method.FILE_LOADS).withTriggeringFrequency(Duration.standardMinutes(5))
                    .withNumFileShards(1000).withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
    );

标签: google-cloud-platformgoogle-bigquerygoogle-cloud-dataflow

解决方案


推荐阅读