python - 并行化从 BigQuery 到 Python 中的本地 JSON 文件的导出
问题描述
我们在 BigQuery 中有一个表,我们需要将其导出到本地以换行符分隔的 JSON 文件中。使用 BigQuery 的导出到 GCS 功能是有问题的,因为它将整数类型转换为字符串,请参阅将 bigquery 数据导出到云存储,整数字段更改为字符串格式但浮点格式保持为数字格式 以及如何在导出为 JSON 时保留整数数据类型?我自己试过了,整数都丢失了。我们提出了以下解决方案,它维护整数类型,但速度很慢:
当前工作代码
bq = bigquery.Client()
our_query = "select * from our_project.our_dataset.our_bq_table"
results_row_iter = bq.query(our_query) # google.bigquery.rowIterator
counter = 0
with open('/tmp/output_file.json', 'w') as f:
for row in results_row_iter:
f.write(json.dumps(dict(row), default=str) + '\n') # dumps as ndjson
our_bq_table
在 BigQuery 中是 5GB,有 340 万行和大约 100 个字段,上面的 for 循环在我们的表上需要 90 分钟。our_bq_table
在整数列上进行分区confId
,并且表中有大约 100 个唯一的 confId,其值为 1 - 100。我们希望利用分区键 + 并行化来加速这个过程......不知何故。
我们要做什么的伪代码
bq = bigquery.Client()
base_query = "select * from our_project.our_dataset.our_bq_table"
all_conf_ids = range(1, 100)
def dump_conf_id(base_query, id):
iter_query = f"{base_query} where confId = {id}"
results_row_iter = bq.query(iter_query)
counter = 0
with open(f'output_file-{id}.json', 'w') as f:
for row in results_row_iter:
f.write(json.dumps(dict(row), default=str) + '\n') # dumps as ndjson
in parallel:
for id in all_conf_ids:
dump_conf_id(id)
# last step, perhaps concat the separate files into 1 somehow, assuming there are multiple output files...
这种方法利用该confId
字段,以便我们的 BigQuery 查询保持较小。我不太确定如何在伪代码之外实现这一点,并且对弄清楚多线程、多处理和其他在 python 中并行化的方法感到不知所措。我们的最终输出需要是单个输出文件,伪代码转储到单独的文件中,但如果我们可以并行转储到单个文件中,那就太好了。
编辑:在实施解决方案之前我们试图解决的一个关键问题是我们应该为此使用多处理还是多线程,因为这是并行转储到本地 .json ......
解决方案
两条经验法则:
- 在 Python 中,您可以在程序很大程度上受 IO 限制的情况下使用多线程,但如果它受 CPU 限制,则必须使用多处理。这是因为 Python 的Global Interpreter Lock。
- 您不应该在同一个程序中混合使用多线程和多处理。
在这种情况下,我猜你的问题(每个结果导出一个 JSON)是 CPU 密集型的,所以我建议多处理。这是一个代码示例:
from multiprocessing import Pool
n_jobs = 4
with Pool(n_jobs) as p:
# call dump_conf_id on each member of all_conf_ids
print(p.map(dump_conf_id, all_conf_ids))
[...] 但如果我们可以并行转储到单个文件中,那就太好了。
我不会费心尝试将它们写入单个文件。读取文件并将它们连接起来可能是您正在做的最快的部分。一个快速的基准测试显示它以大约 780 MB/s 的速度运行:
import subprocess
with open("output.json", "wb") as f:
filenames = [f'output_file-{id}.json' for i in all_conf_ids]
subprocess.check_call(["cat", *filesnames], stdout=f)
推荐阅读
- mysql - MYSQL 连接不使用主键
- android - 如何为 IntellJ IDE 社区版正确设置 android SDK 路径?
- oracle - 如何找到oracle表的所有分区信息
- actions-on-google - 无法识别的深度链接回退在模拟器中不起作用
- python - HTTP 错误代码:访问 twitter 高级 API 时出现 429
- ios - 如何使用 NSURLSession 从 Objective-C 中的服务器下载 100 张图像
- typescript - 使用带有 html 和超链接的 emailcomposer ionic 发送电子邮件
- google-chrome - 无法在 Chrome 中连接到 Github 的网站,但是当我使用 Firefox 时它可以工作
- php - Cakephp REST API 报告缺少控制器类
- bash - 在 bash 中调用变量时出现语法错误