python - 当内核过多或多于一台机器时,Dataflow 无法获取对 BigQuery 表的引用
问题描述
我的流数据流管道应该从 Pub/Sub 读取分析命中并将它们写入 BigQuery。如果我使用太多机器,或者它们太大,那么在获取对表的引用时会引发速率限制错误,更准确地说是在执行 _get_or_create_table 时。
达到的速率限制似乎是其中之一:每个用户每秒 100 个 API 请求,每个用户 300 个并发 API 请求。
它没有阻塞管道(在某个时间点后写入行),但我感觉它阻塞了一些线程并阻止我充分利用并行化。从一台有 4 个 CPU 的机器切换到每台 8 个 CPU 的 5 个机器并没有改善延迟(实际上它变得更糟)。
如何避免此错误,并让大量机器写入 BQ ?
这是来自 Dataflow 监控界面的日志。当我启动管道时,它会定期出现:
...
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1087, in get_or_create_table
found_table = self._get_table(project_id, dataset_id, table_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 184, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 925, in _get_table
response = self.client.tables.Get(request)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 611, in Get
config, request, global_params=global_params)
File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 722, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 728, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 599, in __ProcessHttpResponse
http_response, method_config=method_config, request=request)
HttpForbiddenError: HttpError accessing <https://www.googleapis.com/bigquery/v2/projects/<project_id>/datasets/<dataset_id>/tables/<table_id>?alt=json>: response: <{'status': '403', 'content-length': '577', 'x-xss-protection': '1; mode=block', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'expires': 'Sun, 25 Nov 2018 14:36:24 GMT', 'vary': 'Origin, X-Origin', 'server': 'GSE', '-content-encoding': 'gzip', 'cache-control': 'private, max-age=0', 'date': 'Sun, 25 Nov 2018 14:36:24 GMT', 'x-frame-options': 'SAMEORIGIN', 'content-type': 'application/json; charset=UTF-8'}>, content <{
"error": {
"errors": [
{
"domain": "global",
"reason": "rateLimitExceeded",
"message": "Exceeded rate limits: Your user_method exceeded quota for api requests per user per method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
"locationType": "other",
"location": "helix_api.method_request"
}
],
"code": 403,
"message": "Exceeded rate limits: Your user_method exceeded quota for api requests per user per method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors"
这是管道的代码。我剪掉了几乎所有的东西,看看这是否仍然发生:
p = beam.Pipeline(options=options)
msgs = p | 'Read' >> beam.io.gcp.pubsub.ReadFromPubSub(
topic='projects/{project}/topics/{topic}'.format(
project=args.project, topic=args.hits_topic),
id_label='hit_id',
timestamp_attribute='time')
lines = msgs | beam.Map(lambda x: {'content': x})
(lines
| 'WriteToBQ' >> beam.io.gcp.bigquery.WriteToBigQuery(args.table,
dataset=args.dataset,
project=args.project))
解决方案
尝试升级到最新的apache_beam
库(写作时为 2.12.0)。https://github.com/apache/beam/commit/932e802279a2daa0ff7797a8fc81e952a4e4f252引入了表缓存,否则会触发您在该库的旧版本中可能遇到的速率限制。
推荐阅读
- c# - Winforms DateTimePicker 下拉列表在第一次打开时未显示正确的 maxdate 范围
- excel - 对于某些数据,FilterXML 公式为空
- css - 使用剪辑路径时如何隐藏垂直线
- python - 我无法在 Python 3.9 中运行 while 循环
- c++ - 在我用我的 arduino 发送一段代码后,移位寄存器停止工作
- django - 无法在 django 中查询特定的“用户订阅”
- relational-database - 数据库问题:由于数据质量,2 个表具有相同的结构
- json - JSON- 正则表达式来识别 JSON 中的模式
- react-native - 如何在 React Native 中将本地图像更改为 base64?
- arrays - PNPUtil 检索每个驱动程序并使用 PSObject 添加到数组