首页 > 解决方案 > 并行化从 BigQuery 到 Python 中的本地 JSON 文件的导出

问题描述

我们在 BigQuery 中有一个表,我们需要将其导出到本地以换行符分隔的 JSON 文件中。使用 BigQuery 的导出到 GCS 功能是有问题的,因为它将整数类型转换为字符串,请参阅将 bigquery 数据导出到云存储,整数字段更改为字符串格式但浮点格式保持为数字格式 以及如何在导出为 JSON 时保留整数数据类型?我自己试过了,整数都丢失了。我们提出了以下解决方案,它维护整数类型,但速度很慢:

当前工作代码

bq = bigquery.Client()
our_query = "select * from our_project.our_dataset.our_bq_table"

results_row_iter = bq.query(our_query) # google.bigquery.rowIterator
counter = 0
with open('/tmp/output_file.json', 'w') as f:
    for row in results_row_iter:
        f.write(json.dumps(dict(row), default=str) + '\n') # dumps as ndjson

our_bq_table在 BigQuery 中是 5GB,有 340 万行和大约 100 个字段,上面的 for 循环在我们的表上需要 90 分钟。our_bq_table在整数列上进行分区confId,并且表中有大约 100 个唯一的 confId,其值为 1 - 100。我们希望利用分区键 + 并行化来加速这个过程......不知何故。

我们要做什么的伪代码

bq = bigquery.Client()
base_query = "select * from our_project.our_dataset.our_bq_table"

all_conf_ids = range(1, 100)

def dump_conf_id(base_query, id):
    iter_query = f"{base_query} where confId = {id}"
    results_row_iter = bq.query(iter_query)
    counter = 0
    with open(f'output_file-{id}.json', 'w') as f:
        for row in results_row_iter:
            f.write(json.dumps(dict(row), default=str) + '\n') # dumps as ndjson

in parallel:
    for id in all_conf_ids:
        dump_conf_id(id)

# last step, perhaps concat the separate files into 1 somehow, assuming there are multiple output files...

这种方法利用该confId字段,以便我们的 BigQuery 查询保持较小。我不太确定如何在伪代码之外实现这一点,并且对弄清楚多线程、多处理和其他在 python 中并行化的方法感到不知所措。我们的最终输出需要是单个输出文件,伪代码转储到单独的文件中,但如果我们可以并行转储到单个文件中,那就太好了。

编辑:在实施解决方案之前我们试图解决的一个关键问题是我们应该为此使用多处理还是多线程,因为是并行转储到本地 .json ......

标签: pythonmultithreadingparallel-processinggoogle-bigquery

解决方案


两条经验法则:

  • 在 Python 中,您可以在程序很大程度上受 IO 限制的情况下使用多线程,但如果它受 CPU 限制,则必须使用多处理。这是因为 Python 的Global Interpreter Lock
  • 您不应该在同一个程序中混合使用多线程和多处理。

在这种情况下,我猜你的问题(每个结果导出一个 JSON)是 CPU 密集型的,所以我建议多处理。这是一个代码示例:

from multiprocessing import Pool

n_jobs = 4
with Pool(n_jobs) as p:
    # call dump_conf_id on each member of all_conf_ids
    print(p.map(dump_conf_id, all_conf_ids))

[...] 但如果我们可以并行转储到单个文件中,那就太好了。

我不会费心尝试将它们写入单个文件。读取文件并将它们连接起来可能是您正在做的最快的部分。一个快速的基准测试显示它以大约 780 MB/s 的速度运行:

import subprocess

with open("output.json", "wb") as f:
    filenames = [f'output_file-{id}.json' for i in all_conf_ids]
    subprocess.check_call(["cat", *filesnames], stdout=f)

推荐阅读