python - 将 BigQuery 脚本的结果返回给 Python 客户端
问题描述
截至 2019 年秋季,BigQuery 支持脚本,这很棒。我无法弄清楚BigQuery 的 Python 客户端是否能够利用这个新功能。
例如,运行以下 Python 代码:
client = bigquery.Client()
QUERY = """
BEGIN
CREATE OR REPLACE TEMP TABLE t0 AS
SELECT * FROM my_dataset.my_table WHERE foo < 1;
SELECT SUM(bar) AS bar_sum FROM t0;
DROP TABLE IF EXISTS t0;
END;
"""
query_job = client.query(QUERY)
rows = query_job.result()
google.cloud.bigquery.table._EmptyRowIterator
...即使我能够看到 SQL 脚本中的语句已从 BigQuery 的 Web UI 成功运行,也会返回一个对象。
如何将此标准 SQL 脚本中的 SELECT 语句的结果返回给 Python 客户端?
解决方案
它受支持,但您需要考虑以下文档:
脚本在 BigQuery 中使用 jobs.insert 执行,类似于任何其他查询,多语句脚本指定为查询文本。当脚本执行时,会为脚本中的每个语句创建额外的作业, 称为子作业。您可以通过调用jobs.list 枚举脚本的子作业,将脚本的作业ID 作为parentJobId 参数传入。
当在脚本上调用jobs.getQueryResults 时,它将返回脚本中 要执行的最后一个SELECT、DML 或DDL 语句的查询结果,如果以上语句均未执行,则没有查询结果。要获取脚本中所有语句的结果,请枚举子作业并在每个子作业上调用 jobs.getQueryResults。
例如,我修改了您的脚本以查询公共表:bigquery-public-data.london_bicycles.cycle_stations
. 这运行三个子作业:
最后一个删除表并且不返回任何行:
这就是为什么,如果我运行 Python 文件,我会得到类似<google.cloud.bigquery.table._EmptyRowIterator object at 0x7f440aa33c88>
.
我们想要的是中间查询的输出结果:
一个快速测试是注释掉DROP
语句,然后遍历行以获得sum=6676
. 那么,如果我们想要中间结果呢?与前面引用的文档一样,答案是调用jobs.list
并将脚本作业 ID 作为parentJobId
参数传递以获取子作业 ID:
for job in client.list_jobs(parent_job=query_job.job_id):
print("Job ID: {}, Statement Type: {}".format(job.job_id, job.statement_type))
Job ID: script_job_80e...296_2, Statement Type: DROP_TABLE
Job ID: script_job_9a0...7fd_1, Statement Type: SELECT
Job ID: script_job_113...e13_0, Statement Type: CREATE_TABLE_AS_SELECT
SELECT
请注意,后缀 (0, 1, 2) 表示执行顺序,但我们可以在检索结果之前添加双重检查以验证作业实际上是语句:
from google.cloud import bigquery
client = bigquery.Client()
QUERY = """
BEGIN
CREATE OR REPLACE TEMP TABLE t0 AS
SELECT name, bikes_count FROM `bigquery-public-data.london_bicycles.cycle_stations` WHERE bikes_count > 10;
SELECT SUM(bikes_count) AS total_bikes FROM t0;
DROP TABLE IF EXISTS t0;
END;
"""
query_job = client.query(QUERY)
query_job.result()
for job in client.list_jobs(parent_job=query_job.job_id): # list all child jobs
# print("Job ID: {}, Statement Type: {}".format(job.job_id, job.statement_type))
if job.statement_type == "SELECT": # print the desired job output only
rows = job.result()
for row in rows:
print("sum={}".format(row["total_bikes"]))
输出:
sum=6676
推荐阅读
- php - 使用 php 文件将来自 **L.tileLayer()** 函数的 GET 请求转发到 localhost
- git - 在与 meld 的 git 合并期间,如果只保存 MERGED,为什么我可以修改 LOCAL 和 REMOTE?
- php - 在 WordPress 网站上添加登录功能并向用户显示特定内容
- python - 将进度条或仪表小部件添加到 PyQtGraph
- django - 如何从石墨烯返回自定义错误消息?
- excel - 使用 VBA 宏在 Excel 中的员工病假表
- postgresql - 如何获取PostgreSQL中的所有类别和子类别?
- maven - SkipException 在课前,双重测试用例被打印在范围报告中
- python - 编码的目标列只显示一个类别?
- html - Firefox 阻止加载 HTML iframe