首页 > 解决方案 > Dataflow 作业成功完成后如何运行 BigQuery

问题描述

我正在尝试在数据流作业成功完成后立即在 BigQuery 中运行查询。我在 main.py 中定义了 3 个不同的函数。

第一个用于运行数据流作业。第二个检查数据流作业状态。最后一个在 BigQuery 中运行查询。

问题是第二个函数在一段时间内多次检查数据流作业状态,并且在数据流作业成功完成后,它并没有停止检查状态。然后由于“功能加载尝试超时”错误,功能部署失败。

from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
import os
import re
import config
from google.cloud import bigquery
import time

global flag

def trigger_job(gcs_path, body):
    credentials = GoogleCredentials.get_application_default()
    service = build('dataflow', 'v1b3', credentials=credentials, cache_discovery=False)
    request = service.projects().templates().launch(projectId=config.project_id, gcsPath=gcs_path, body=body)
    response = request.execute()

def get_job_status(location, flag):
    credentials=GoogleCredentials.get_application_default()
    dataflow=build('dataflow', 'v1b3', credentials=credentials, cache_discovery=False)
    result=dataflow.projects().jobs().list(projectId=config.project_id, location=location).execute()

    for job in result['jobs']:
        if re.findall(r'' + re.escape(config.job_name) + '', job['name']):
            while flag==0:
                if job['currentState'] != "JOB_STATE_DONE":
                    print('NOT DONE')
                else:
                    flag=1
                    print('DONE')
                
                    break

def bq(sql):
    client = bigquery.Client()
    query_job = client.query(sql, location='US')

gcs_path = config.gcs_path
body=config.body
trigger_job(gcs_path,body)
flag=0
location='us-central1'
get_job_status(location,flag)
sql= """CREATE OR REPLACE TABLE 'table' AS SELECT * FROM 'table'"""
bq(SQL)

Cloud Function 超时设置为 540 秒,但部署会在 3-4 分钟内失败。

非常感谢任何帮助。

标签: google-cloud-platformgoogle-apigoogle-bigquerygoogle-cloud-functionsgoogle-cloud-dataflow

解决方案


如果您的 HTTP 触发的云函数未返回 HTTP 响应,则从代码片段中可以看出。

所有基于 HTTP 的云函数都必须返回 HTTP 响应才能正确终止。从谷歌文档确保 HTTP 函数发送 HTTP 响应(强调 - 我的):

如果您的函数是 HTTP 触发的,请记住发送 HTTP 响应,如下所示。不这样做可能会导致您的函数执行到 timeout。如果发生这种情况,您将被收取整个超时时间的费用。超时还可能导致后续调用出现不可预测的行为或冷启动,从而导致不可预测的行为或额外的延迟。

因此,您必须有一个main.py返回某种值的函数,理想情况下是可以强制转换为 Flask http 响应的值。


推荐阅读