google-cloud-platform - 数据流批量插入 BigQuery 无法在 asia-northeast1 区域中找到数据集
问题描述
我们有一个位于 asia-northeast1 区域的 BigQuery 数据集。我们正在尝试运行一个数据流作业,该作业从 pub/sub 读取事件并通过 BigQueryIO FILE_LOADS 方法写入 BigQuery。
当我们运行数据流以插入 BigQuery 数据集(位于美国地区)时,它可以工作,但是当我们运行数据流以插入 BigQuery 数据集(位于 asia-northeast1)时,它会给我们以下错误。
java.io.IOException: Unable to insert job: beam_load_starterpipelinechintan0sureliya0619062952b1babd6f_bc8eb81e4c0f48c5bc9fe8268596d9e5_2aa762b44d22c1bad4faa10e3e95f341_00001_00000-0, aborting after 9 .
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.startJob(BigQueryServicesImpl.java:232)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.startJob(BigQueryServicesImpl.java:203)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.startLoadJob(BigQueryServicesImpl.java:143)
org.apache.beam.sdk.io.gcp.bigquery.WriteTables.load(WriteTables.java:262)
org.apache.beam.sdk.io.gcp.bigquery.WriteTables.access$600(WriteTables.java:79)
org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.processElement(WriteTables.java:157)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
"code" : 404,
"errors" : [ {
"domain" : "global",
"message" : "Not found: Dataset datapipelinedev:event",
"reason" : "notFound"
} ],
"message" : "Not found: Dataset datapipelinedev:event"
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1065)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.startJob(BigQueryServicesImpl.java:217)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.startJob(BigQueryServicesImpl.java:203)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl.startLoadJob(BigQueryServicesImpl.java:143)
org.apache.beam.sdk.io.gcp.bigquery.WriteTables.load(WriteTables.java:262)
org.apache.beam.sdk.io.gcp.bigquery.WriteTables.access$600(WriteTables.java:79)
org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.processElement(WriteTables.java:157)
org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:60)
com.google.cloud.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:300)
com.google.cloud.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:226)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:35)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1211)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:959)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
我们之前尝试在位于 asia-northeast1 区域的 BigQuery 中进行流式插入,它可以工作,但我们需要使用批处理作业加载以避免 BigQuery 流式缓冲区。
我们在调试时发现了以下内容
- Dataflow 正在我们位于 asia-northeast1 区域的 GCS 临时位置写入表行
- Dataflow 正在获取这些临时文件并尝试插入到目标表中,但由于无法找到 BigQuery 数据集而无法执行此操作
我们试图找出是否有可能为 BigQueryIO.writeTableRows() 设置 BigQueryIO 区域,但没有找到。
下面是使用 BigQueryIO 动态目标 FILE_LOADS 方法插入 BigQuery 的一段代码。
windowed_items.apply("batch_insert_into_respective_table", new ReadEventJson_bigquery())
.apply("Write_events_to_BQ",
BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() {
public String getDestination(ValueInSingleWindow<TableRow> element) {
String destination = EventSchemaBuilder
.fetch_destination_based_on_event(element.getValue().get("event").toString());
return destination;
}
@Override
public TableDestination getTable(String table) {
String destination = EventSchemaBuilder.fetch_table_name_based_on_event(table);
return new TableDestination(destination, destination);
}
@Override
public TableSchema getSchema(String table) {
TableSchema table_schema = EventSchemaBuilder.fetch_table_schema_based_on_event(table);
return table_schema;
}
}).withMethod(Method.FILE_LOADS).withTriggeringFrequency(Duration.standardMinutes(5))
.withNumFileShards(1000).withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
);
解决方案
推荐阅读
- visual-studio-code - 如何使 VSCode 光标宏瞬时?
- r - 如何删除或子集文本中的某些上下文?
- python - 在文档字符串中编写“a 或 b”类型提示的正确方法是什么?
- ruby-on-rails - Docker 映像卡在捆绑上
- python - Some strange questions about pytorch copy a tensor
- python - Python Tkinter在画布中混合收音机和默认用户输入框?
- c - 在 C 中声明数组时如何将其放在堆栈上?
- jenkins - Jenkins windows slave自动重连
- c# - dotnet restore 在 macos 上是否损坏?
- javascript - 悬空引用如何损害安全性?