首页 > 解决方案 > 在 Dataflow 流式传输管道上捕获 BigQuery HttpBadRequestError

问题描述

最近,由于超出请求大小,我的 Dataflow 流作业从 BigQuery API 抛出 HttpBadRequestError。

    Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1246, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 514, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 520, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1288, in finish_bundle
    return self._flush_all_batches()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1296, in _flush_all_batches
    *[
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1297, in <listcomp>
    self._flush_batch(destination)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1326, in _flush_batch
    passed, errors = self.bigquery_wrapper.insert_rows(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 984, in insert_rows
    result, errors = self._insert_all_rows(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py", line 236, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 539, in _insert_all_rows
    response = self.client.tabledata.InsertAll(request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 761, in InsertAll
    return self._RunMethod(
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
    raise exceptions.HttpError.FromResponse(
apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-projects/datasets/my-datasets/tables/my-tables/insertAll?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Sat, 20 Feb 2021 14:21:53 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '321', '-content-encoding': 'gzip'}>, content <{
  "error": {
    "code": 400,
    "message": "Request payload size exceeds the limit: 10485760 bytes.",
    "errors": [
      {
        "message": "Request payload size exceeds the limit: 10485760 bytes.",
        "domain": "global",
        "reason": "badRequest"
      }
    ],
    "status": "INVALID_ARGUMENT"
  }
}
>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 258, in _execute
    response = task()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 315, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 483, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 990, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 730, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 732, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 733, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/common.py", line 1267, in apache_beam.runners.common.DoFnRunner.finish
  File "apache_beam/runners/common.py", line 1248, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1246, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 514, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 520, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1288, in finish_bundle
    return self._flush_all_batches()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1296, in _flush_all_batches
    *[
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1297, in <listcomp>
    self._flush_batch(destination)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1326, in _flush_batch
    passed, errors = self.bigquery_wrapper.insert_rows(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 984, in insert_rows
    result, errors = self._insert_all_rows(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py", line 236, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 539, in _insert_all_rows
    response = self.client.tabledata.InsertAll(request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 761, in InsertAll
    return self._RunMethod(
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
    raise exceptions.HttpError.FromResponse(
RuntimeError: apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-projects/datasets/my-datasets/tables/my-tables/insertAll?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Sat, 20 Feb 2021 14:21:53 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '321', '-content-encoding': 'gzip'}>, content <{
  "error": {
    "code": 400,
    "message": "Request payload size exceeds the limit: 10485760 bytes.",
    "errors": [
      {
        "message": "Request payload size exceeds the limit: 10485760 bytes.",
        "domain": "global",
        "reason": "badRequest"
      }
    ],
    "status": "INVALID_ARGUMENT"
  }
}
> [while running 'WriteBqTables/WriteBQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-25875']

passed through:
==>
    dist_proc/dax/workflow/worker/fnapi_service.cc:631

我想使用这种死字母模式来减轻这种错误,以防它再次发生。

发生 HttpBadRequestError 时,BQ 死字模式是否也有效?还是仅在由于架构不匹配而导致插入行失败时才有效?我对 python 使用 Apache Beam SDK 版本 2.27.0

提前致谢

更新于 20201-02-24:我添加了更多堆栈跟踪片段,以显示错误发生的时间

标签: pythongoogle-cloud-dataflowapache-beam

解决方案


是的,该模式将起作用。一般来说,它会捕获任何可以捕获的故障(有时事情会严重失败,以至于处理完全停止)。

在您的特定情况下,堆栈跟踪包括BigQueryIO 的这个区域,您可以看到失败的行输出到下面的死信 PCollection ,这里


推荐阅读