google-bigquery - 在 Dataflow 管道中写入 BigQuery 表失败
问题描述
我正在开发一个 Dataflow 管道,它正在从谷歌云存储读取一个 protobuf 文件并对其进行解析并尝试写入 BigQuery 表。没有时它工作正常。行数约为 20k 但没有。行数约为 200k,然后失败。下面是示例代码:
Pipeline pipeline = Pipeline.create(options);
PCollection<PBClass> dataCol = pipeline.apply(FileIO.match().filepattern(options.getInputFile()))
.apply(FileIO.readMatches())
.apply("Read GPB File", ParDo.of(new ParseGpbFn()));
dataCol.apply("Transform to Delta", ParDo.of(deltaSchema))
.apply(Flatten.iterables())
.apply(
BigQueryIO
//.write()
.writeTableRows()
.to(deltaSchema.tableSpec)
.withMethod(Method.STORAGE_WRITE_API)
.withSchema(schema)
//.withFormatFunction(irParDeltaSchema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withExtendedErrorInfo()
)
;
尝试了以下方法的不同组合
withMethod
write
withFormatFunction
也不同。工人和不同的计算引擎类型。
每次它卡在GroupByKey
阶段并给出以下错误:
Error message from worker: java.lang.RuntimeException: Failed to create job with prefix beam_bq_job_LOAD_testjobpackage_<...>, reached max retries: 3, last failed job: null.
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJob.runJob(BigQueryHelpers.java:199)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJobManager.waitForDone(BigQueryHelpers.java:152)
org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:322)
解决方案
Error message from worker: java.lang.RuntimeException: Failed to create job with prefix beam_bq_job_LOAD_testjobpackage_<...>, reached max retries: 3, last failed job: null.
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJob.runJob(BigQueryHelpers.java:199)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJobManager.waitForDone(BigQueryHelpers.java:152)
org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:322)
您收到的错误代码 - 如上所述 - 是因为在您的代码中的某处,当您指定要加载的 GCS 文件时,它的格式不正确,URI 应该看起来像这样 gs://bucket/路径/到/文件。
推荐阅读
- macos - LazyVGrid 中的 NavigationLink 在 swiftUI macOS 中无法正确显示
- f# - 如何使用 F# 的 rx 扩展从简单值创建 Observable?
- windows - iFuse 在 Windows 上访问 iPhone 文件
- mysql - mysql group by 返回所有行
- yocto - meta-qt5 yocto 层无法在 qt 版本 5.15.2 上编译
- python-3.x - 为什么列表变量有时不受函数更改的影响,因为我认为 python3 通过引用传递与列表变量一起工作?
- python - ord () 期望一个字符
- assembly - 在汇编中打印一个三角形的字符
- wordpress - cURL vs wp_remote_get:对于插件中的外部 API 调用,哪一个最可靠?
- websphere - 如何在 Open Liberty 服务器开发模式下调试我的测试(使用 liberty-maven-plugin)并打开/关闭调试器?