首页 > 解决方案 > 有没有办法从 BigQuery Python 客户端库执行非阻塞 load_job?

问题描述

我有一个使用 Flask_restful、Flask_CORS 和 Marshmallow 的 Flask API。API 执行一些工作以将 *.csv 文件获取到 Cloud Storage(使用 signedURL),确认它已上传,然后创建并执行加载作业以将 csv 从 Storage 传输到 BigQuery。API 中加剧我脱发的部分是调用在 GCP 中执行加载作业,将 csv 文件加载到 BigQuery。这是代码片段:

...
            dataset_ref = bq_client.dataset(target_dataset) 
            job_config.schema =  bq_schema 
            job_config.source_format = SOURCE_FORMAT 
            job_config.field_delimiter =  DELIM  
            job_config.destination_table_description = TARGET_TABLE
            job_config.encoding = ENCODING 
            job_config.max_bad_records = MAX_BAD_RECORDS
            job_config.autodetect = False # Do not autodetect schema
            load_job = bq_client.load_table_from_uri(
                uri, dataset_ref.table(target_table), job_config=job_config
            )  # API request
            load_job.result() # **<-- This is the concern**
            return {"message": "Successfully uploaded to Bigquery"}, 200

传输文件可能需要一些时间,我担心的是,在有一些延迟的期间,网络服务器会在等待传输发生时超时。我更愿意load_job.result()执行、获取作业 ID 并返回 201 响应。然后我可以使用作业 ID 来轮询 GCP 以确定它是否成功,而不会有客户端前端的请求超时的风险,而让用户对它是否成功感到困惑不是。

我知道 load_job.result() 是异步的,但是对于 Flask 没有帮助。我打算改用 Quart 来使用 async/await,但我的其他依赖项不受支持,因此我需要进行大量重构。还有其他人用来解决此类问题的方法吗?干杯

标签: pythonflaskgoogle-cloud-platformgoogle-bigquery

解决方案


夸脱解决不了任何问题。事实上,Quart 仍然需要一个运行环境,它等待并监督阻塞函数并在最后调用你的回调。您的函数必须仍在运行才能执行此操作。

对此有更好的设计。我建议你看看Cloud Task。过程如下:

  • 运行您的加载作业
  • 使用参数中的加载作业 ID 创建任务
  • 退出函数
  • 任务将触发另一个函数,该函数将检查作业是否结束
    • 如果尚未完成,则返回错误代码(不同于 2XX)。
    • 如果完成,返回 OK 返回码 (2XX)

您必须使用重试策略设置 Cloud Task以不立即重试(例如设置min-backoff为 30 秒)


推荐阅读