google-bigquery - Bigquery 存储 API 多处理段错误
问题描述
长期读者,第一次海报。我正在使用 BigQuery Storage API Python 客户端库,并且在使用 Python 多处理拆分读者时遇到了一些麻烦。
文档中有一条注释说:
因为这个客户端使用了 grpcio 库,所以跨线程共享实例是安全的。在多处理场景中,最佳实践是在multiprocessing.Pool 或 multiprocessing.Process 调用 os.fork()之后创建客户端实例。
我认为我这样做是正确的……但我一定不是。
这是我目前的代码。目标是读取多个并行流中的 BQ 表,然后将数据行写入单个 CSV 文件。创建完所有 CSV 文件后,我将执行一个简单的 cat 命令来组合它们。
附带说明一下,此代码实际上适用于小型 BigQuery 表,但在尝试下载大型 BQ 表时它会因段错误而失败。
import faulthandler
faulthandler.enable()
from google.cloud.bigquery_storage import BigQueryReadClient
from google.cloud.bigquery_storage import types
import multiprocessing as mp
import psutil
import os
import sys
import csv
from datetime import datetime
def extract_table(i):
client_in = BigQueryReadClient()
reader_in = client_in.read_rows(session.streams[i].name, timeout=10000)
rows = reader_in.rows(session)
csv_file = "/home/user/sas/" + table_name + "_" + str(i) + ".csv"
print(f"Starting at time {datetime.now()} for file {csv_file}")
try:
with open(csv_file, 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
if i == 0:
writer.writeheader()
else:
pass
for data in rows:
# print(data)
writer.writerow(data)
except IOError:
print("I/O error")
print(f"Finished at time {datetime.now()} for file {csv_file}")
return
if __name__ == '__main__':
# Get input args
project_id = sys.argv[1]
db_name = sys.argv[2]
table_name = sys.argv[3]
n = len(sys.argv[4])
a = sys.argv[4][1:n - 1]
csv_columns = a.replace("'", '').split(', ')
output_type = sys.argv[5] # csv or sas
bucket_root = sys.argv[6]
# The read session is created in this project. This project can be
# different from that which contains the table.
client = BigQueryReadClient()
table = "projects/{}/datasets/{}/tables/{}".format(
project_id, db_name, table_name
)
requested_session = types.ReadSession()
requested_session.table = table
# This API can also deliver data serialized in Apache Arrow format.
# This example leverages Apache Avro.
requested_session.data_format = types.DataFormat.AVRO
# We limit the output columns to a subset of those allowed in the table
requested_session.read_options.selected_fields = csv_columns
ncpus = psutil.cpu_count(logical=False)
if ncpus <= 2:
ncpus_buffer = 2
else:
ncpus_buffer = ncpus - 2
print(f"You have {ncpus} cores according to psutil. Using {ncpus_buffer} cores")
parent = "projects/{}".format(project_id)
session = client.create_read_session(
parent=parent,
read_session=requested_session,
max_stream_count=ncpus_buffer,
)
print(f"There are {len(session.streams)} streams")
num_streams = int(len(session.streams))
with mp.Pool(processes=ncpus_buffer) as p:
result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)
使用以下命令样式调用代码:
python /home/user/sas/bq_extract_2.py gc-project-id dataset table "['column1', 'column2']" csv 'path/to/gcs/bucket'
同样,这适用于小表,有几次我已经让它在 50-100 GB 大小范围内的非常大的 BQ 表上工作。但是,大多数情况下,大型表失败并出现以下错误:
有 1000 个流根据 psutil,您有 2 个核心。从 2020-11-17 17:46:04.645398 开始对文件 /home/user/sas/diag_0.csv 使用 2 个内核
文件 /home/user/sas/diag_1.csv 从 2020-11-17 17:46:04.829381 开始
致命的 Python 错误:分段错误
线程 0x00007f4293f94700(最近调用优先):文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/site-packages/grpc/_channel.py”,channel_spin 文件中的第 1235 行“/home/ user/anaconda3/envs/sas-controller/lib/python3.8/threading.py”,运行文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py”中的第 870 行,_bootstrap_inner 文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py”中的第 932 行,_bootstrap 中的第 890 行
线程 0x00007f42bc4c9740(最近调用优先):文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/csv.py”,_dict_to_list 文件中的第 151 行“/home/user/anaconda3/envs/ sas-controller/lib/python3.8/csv.py”,写入行文件“/home/user/sas/bq_extract_2.py”中的第 154 行,extract_table 文件中的第 39 行“/home/user/anaconda3/envs/sas- controller/lib/python3.8/multiprocessing/pool.py”,mapstar 文件中的第 48 行“/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py”,第 125 行工作文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/process.py”,运行文件“/home/user/anaconda3/envs/sas-controller/lib/”中的第 108 行python3.8/multiprocessing/process.py",_bootstrap 文件中的第 315 行"/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/popen_fork.py”,_launch 文件中的第 75 行“/home/user/anaconda3/envs/sas-controller/lib/python3.8 /multiprocessing/popen_fork.py”,第 19 行初始化 文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/context.py”,_Popen 文件中的第 277 行“/home/user/anaconda3/envs/sas-controller/lib/ python3.8/multiprocessing/process.py”,开始文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py”中的第 121 行,_repopulate_pool_static 文件中的第 326 行“/ home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py”,_repopulate_pool 文件中的第 303 行“/home/user/anaconda3/envs/sas-controller/lib/python3.8/ multiprocessing/pool.py”,初始化 文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/context.py”中的第 212 行,池文件“/home/user/”中的第 119 行sas/bq_extract_2.py",模块中的第 157 行
编辑 1:将 .read_rows 上的超时更新为 10000,以允许从 BQ 读取大结果。还将 max_stream_count 更改为等于池将使用的核心数。这似乎对我的测试有很大帮助,但是当我在 Google Cloud Compute 实例上将其作为启动脚本运行时,控制台输出中仍然会出现段错误。
编辑 2:我研究得越多,似乎就越不可能有效地将 Python 多处理与 Google BigQuery Storage API 一起使用。鉴于需要在调用 os.fork()后创建读取会话,我无法确保为各个进程分配正确的读取行数。每个会话都与它所附加的 BQ 表创建自己的一对多(一个会话对多个流)关系,并且每个会话似乎在流中分解表行的方式略有不同。
举个例子,我们想用 3 个进程导出一个有 30 行的表,每个进程处理一个行流。格式化在移动设备上可能看起来很奇怪。
os.fork()
Process 1 Process 2 Process 3
Session1 Session2 Session3
*Stream1 - 10 rows Stream1 - 8 rows Stream1 - 9 rows
Stream2 - 10 rows *Stream2 - 12 rows Stream2 - 11 rows
Stream3 - 10 rows Stream3 - 10 rows *Stream3 - 10 rows
在此示例中,我们最终得到 32 个输出行,因为每个会话没有以完全相同的方式定义其流。
我尝试使用线程(下面的代码)而不是进程,这很有效,因为 gRPC 是线程安全的。
# create read session here
# Then call the target worker function with one thread per worker
for i in range(0, num_streams):
t = threading.Thread(target=extract_table, args=(i,))
t.start()
但是,这样做的最大问题是使用 8 个线程与使用 1 个线程所花费的时间一样长,并且无论您现在使用多少线程,跨线程的总吞吐量似乎最大约为 5 MB/s。
这与使用吞吐量似乎随着工人的添加而线性扩展的流程形成对比(我在某些测试中看到高达〜100 MB / s)......在极少数情况下,我能够让它在没有a的情况下工作段错误中断事情。这似乎只是纯粹的运气。
使用 1 个线程:
总时间:~ 3:11
使用 8 个线程:
总时间:~ 3:15
据我所知,使用多个线程基本上没有速度优势。
如果有人对我缺少的任何东西有任何想法,请告诉我!我希望能够让它发挥作用。我真的很喜欢 BQ Storage API 的功能(行过滤器、列选择、没有导出限制),但是在我们找到合适的方式来分散读者之前,我们将无法使用它。
解决方案
grpcio(由库使用)和多处理存在一个已知问题。google-cloud-bigquery-storage
根据此代码示例,“在任何 gRPC 服务器启动之前分叉工作子进程”。
由于您的工作负载主要受 I/O 限制,因此全局解释器锁不应该是主要的性能瓶颈。我建议使用线程来分配工作,就像在library中所做的那样google-cloud-bigquery
。
代替:
with mp.Pool(processes=ncpus_buffer) as p:
result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)
和:
with concurrent.futures.ThreadPoolExecutor(max_workers=num_streams) as p:
result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)
推荐阅读
- r - 容器中的 R odbc 包无法正确读取字符串
- java - My Server-Client Messenger Doesn't Seem To Be Sending/Recieving Messages - Java
- kubernetes - 来自我的 DNS 提供商的 Kubernetes 网络瓶颈?
- html - 如何根据数组中的注释数量动态创建 Quill 编辑器?
- python - 我正在尝试使用 re.findall() 提取电话号码。但它显示 TypeError: expected string or bytes-like object
- docker - 运行 Jmeter 负载测试时出现 ZuulException
- c# - SQL Server TLS 1.2 通信真的加密了吗?
- javascript - 如何删除特定行而不是始终删除第一行(模态)
- java - 自使用 spring.main.lazy-initialization: true 的 spring boot 2.5.x 以来,使用数据库的测试失败
- java - 反应原生错误:类 PackageList 是公共的,应该在名为 PackageList.java 的文件中声明