首页 > 解决方案 > 将 BigQuery 脚本的结果返回给 Python 客户端

问题描述

截至 2019 年秋季,BigQuery 支持脚本,这很棒。我无法弄清楚BigQuery 的 Python 客户端是否能够利用这个新功能。

例如,运行以下 Python 代码:

client = bigquery.Client()
QUERY = """
BEGIN
    CREATE OR REPLACE TEMP TABLE t0 AS
        SELECT * FROM my_dataset.my_table WHERE foo < 1;

    SELECT SUM(bar) AS bar_sum FROM t0;

    DROP TABLE IF EXISTS t0;
END;
"""

query_job = client.query(QUERY)
rows = query_job.result()

google.cloud.bigquery.table._EmptyRowIterator...即使我能够看到 SQL 脚本中的语句已从 BigQuery 的 Web UI 成功运行,也会返回一个对象。

如何将此标准 SQL 脚本中的 SELECT 语句的结果返回给 Python 客户端?

标签: pythonpython-3.xgoogle-bigquery

解决方案


它受支持,但您需要考虑以下文档

脚本在 BigQuery 中使用 jobs.insert 执行,类似于任何其他查询,多语句脚本指定为查询文本。当脚本执行时,会为脚本中的每个语句创建额外的作业, 称为子作业。您可以通过调用jobs.list 枚举脚本的子作业,将脚本的作业ID 作为parentJobId 参数传入。

当在脚本上调用jobs.getQueryResults 时,它将返回脚本中 要执行的最后一个SELECT、DML 或DDL 语句的查询结果,如果以上语句均未执行,则没有查询结果。要获取脚本中所有语句的结果,请枚举子作业并在每个子作业上调用 jobs.getQueryResults。

例如,我修改了您的脚本以查询公共表:bigquery-public-data.london_bicycles.cycle_stations. 这运行三个子作业:

在此处输入图像描述

最后一个删除表并且不返回任何行

在此处输入图像描述

这就是为什么,如果我运行 Python 文件,我会得到类似<google.cloud.bigquery.table._EmptyRowIterator object at 0x7f440aa33c88>.

我们想要的是中间查询的输出结果

在此处输入图像描述

一个快速测试是注释掉DROP语句,然后遍历行以获得sum=6676. 那么,如果我们想要中间结果呢?与前面引用的文档一样,答案是调用jobs.list并将脚本作业 ID 作为parentJobId参数传递以获取子作业 ID:

for job in client.list_jobs(parent_job=query_job.job_id):
    print("Job ID: {}, Statement Type: {}".format(job.job_id, job.statement_type))

我们使用该list_jobs方法并检查ID 和语句类型

Job ID: script_job_80e...296_2, Statement Type: DROP_TABLE
Job ID: script_job_9a0...7fd_1, Statement Type: SELECT
Job ID: script_job_113...e13_0, Statement Type: CREATE_TABLE_AS_SELECT

SELECT请注意,后缀 (0, 1, 2) 表示执行顺序,但我们可以在检索结果之前添加双重检查以验证作业实际上是语句:

from google.cloud import bigquery

client = bigquery.Client()
QUERY = """
BEGIN
    CREATE OR REPLACE TEMP TABLE t0 AS
        SELECT name, bikes_count FROM `bigquery-public-data.london_bicycles.cycle_stations` WHERE bikes_count > 10;

    SELECT SUM(bikes_count) AS total_bikes FROM t0;

    DROP TABLE IF EXISTS t0;
END;
"""

query_job = client.query(QUERY)
query_job.result()

for job in client.list_jobs(parent_job=query_job.job_id):  # list all child jobs
    # print("Job ID: {}, Statement Type: {}".format(job.job_id, job.statement_type))
    if job.statement_type == "SELECT":  # print the desired job output only
        rows = job.result()
        for row in rows:
            print("sum={}".format(row["total_bikes"]))

输出:

sum=6676

推荐阅读