首页 > 解决方案 > 如何遍历 BigQuery 查询结果并将其写入文件

问题描述

我需要查询 Google BigQuery 表并将结果导出到 gzip 文件。这是我当前的代码。要求是每行数据都应该是新行 (\n) 删除。

def batch_job_handler(args):

    credentials = Credentials.from_service_account_info(service_account_info)

    client = Client(project=service_account_info.get("project_id"),
                    credentials=credentials)

    query_job = client.query(QUERY_STRING)

    results = query_job.result()  # Result's total_rows is 1300000 records

    with gzip.open("data/query_result.json.gz", "wb") as file:
        data = ""
        for res in results:
            data += json.dumps(dict(list(res.items()))) + "\n"
            break
        file.write(bytes(data, encoding="utf-8"))

上述解决方案对于少量结果非常有效,但是如果结果有 1300000 条记录,则会变得太慢。

是不是因为这一行:json.dumps(dict(list(res.items()))) + "\n"因为我正在通过新行连接每个记录来构造一个巨大的字符串。

当我在 AWS 批处理中运行这个程序时,它消耗了太多时间。我需要帮助以更快的方式迭代结果并写入数百万条记录的文件。

标签: google-bigquerypython-3.7

解决方案


查看新的 BigQuery Storage API 以快速阅读:

有关工作中的 API 示例,请参阅此项目:

与使用以前的基于导出的读取流程相比,它具有许多优势,通常应该会带来更好的读取性能:

  • 直接流式传输

它不会在 Google Cloud Storage 中留下任何临时文件。使用 Avro 有线格式直接从 BigQuery 服务器读取行。

  • 过滤

新的 API 允许列和有限谓词过滤仅读取您感兴趣的数据。

  • 列过滤

由于 BigQuery 由列式数据存储提供支持,因此它可以高效地流式传输数据,而无需读取所有列。

  • 谓词过滤

Storage API 支持有限的谓词过滤器下推。它支持与文字的单一比较


推荐阅读