首页 > 解决方案 > Bigquery 存储 API 多处理段错误

问题描述

长期读者,第一次海报。我正在使用 BigQuery Storage API Python 客户端库,并且在使用 Python 多处理拆分读者时遇到了一些麻烦。

文档中有一条注释说:

因为这个客户端使用了 grpcio 库,所以跨线程共享实例是安全的。在多处理场景中,最佳实践是在multiprocessing.Pool 或 multiprocessing.Process 调用 os.fork()之后创建客户端实例。

我认为我这样做是正确的……但我一定不是。

这是我目前的代码。目标是读取多个并行流中的 BQ 表,然后将数据行写入单个 CSV 文件。创建完所有 CSV 文件后,我将执行一个简单的 cat 命令来组合它们。

附带说明一下,此代码实际上适用于小型 BigQuery 表,但在尝试下载大型 BQ 表时它会因段错误而失败。

import faulthandler
faulthandler.enable()
from google.cloud.bigquery_storage import BigQueryReadClient
from google.cloud.bigquery_storage import types
import multiprocessing as mp
import psutil
import os
import sys
import csv
from datetime import datetime


def extract_table(i):

    client_in = BigQueryReadClient()
    reader_in = client_in.read_rows(session.streams[i].name, timeout=10000)

    rows = reader_in.rows(session)

    csv_file = "/home/user/sas/" + table_name + "_" + str(i) + ".csv"
    print(f"Starting at time {datetime.now()} for file {csv_file}")

    try:
        with open(csv_file, 'w') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
            if i == 0:
                writer.writeheader()
            else:
                pass
            for data in rows:
                # print(data)
                writer.writerow(data)
    except IOError:
        print("I/O error")

    print(f"Finished at time {datetime.now()} for file {csv_file}")
    return


if __name__ == '__main__':
    # Get input args
    project_id = sys.argv[1]
    db_name = sys.argv[2]
    table_name = sys.argv[3]

    n = len(sys.argv[4])
    a = sys.argv[4][1:n - 1]
    csv_columns = a.replace("'", '').split(', ')

    output_type = sys.argv[5]  # csv or sas
    bucket_root = sys.argv[6]

    # The read session is created in this project. This project can be
    # different from that which contains the table.
    client = BigQueryReadClient()

    table = "projects/{}/datasets/{}/tables/{}".format(
        project_id, db_name, table_name
    )

    requested_session = types.ReadSession()
    requested_session.table = table
    
    # This API can also deliver data serialized in Apache Arrow format.
    # This example leverages Apache Avro.
    requested_session.data_format = types.DataFormat.AVRO

    # We limit the output columns to a subset of those allowed in the table
    requested_session.read_options.selected_fields = csv_columns
    
    ncpus = psutil.cpu_count(logical=False)

    if ncpus <= 2:
        ncpus_buffer = 2
    else:
        ncpus_buffer = ncpus - 2

    print(f"You have {ncpus} cores according to psutil. Using {ncpus_buffer} cores")

    parent = "projects/{}".format(project_id)
    session = client.create_read_session(
        parent=parent,
        read_session=requested_session,
        max_stream_count=ncpus_buffer,
    )

    print(f"There are {len(session.streams)} streams")

    num_streams = int(len(session.streams))

    with mp.Pool(processes=ncpus_buffer) as p:
        result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)

使用以下命令样式调用代码:

python /home/user/sas/bq_extract_2.py gc-project-id dataset table "['column1', 'column2']" csv 'path/to/gcs/bucket'

同样,这适用于小表,有几次我已经让它在 50-100 GB 大小范围内的非常大的 BQ 表上工作。但是,大多数情况下,大型表失败并出现以下错误:

有 1000 个流根据 psutil,您有 2 个核心。从 2020-11-17 17:46:04.645398 开始对文件 /home/user/sas/diag_0.csv 使用 2 个内核

文件 /home/user/sas/diag_1.csv 从 2020-11-17 17:46:04.829381 开始

致命的 Python 错误:分段错误

线程 0x00007f4293f94700(最近调用优先):文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/site-packages/grpc/_channel.py”,channel_spin 文件中的第 1235 行“/home/ user/anaconda3/envs/sas-controller/lib/python3.8/threading.py”,运行文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py”中的第 870 行,_bootstrap_inner 文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py”中的第 932 行,_bootstrap 中的第 890 行

线程 0x00007f42bc4c9740(最近调用优先):文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/csv.py”,_dict_to_list 文件中的第 151 行“/home/user/anaconda3/envs/ sas-controller/lib/python3.8/csv.py”,写入行文件“/home/user/sas/bq_extract_2.py”中的第 154 行,extract_table 文件中的第 39 行“/home/user/anaconda3/envs/sas- controller/lib/python3.8/multiprocessing/pool.py”,mapstar 文件中的第 48 行“/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py”,第 125 行工作文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/process.py”,运行文件“/home/user/anaconda3/envs/sas-controller/lib/”中的第 108 行python3.8/multiprocessing/process.py",_bootstrap 文件中的第 315 行"/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/popen_fork.py”,_launch 文件中的第 75 行“/home/user/anaconda3/envs/sas-controller/lib/python3.8 /multiprocessing/popen_fork.py”,第 19 行初始化 文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/context.py”,_Popen 文件中的第 277 行“/home/user/anaconda3/envs/sas-controller/lib/ python3.8/multiprocessing/process.py”,开始文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py”中的第 121 行,_repopulate_pool_static 文件中的第 326 行“/ home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py”,_repopulate_pool 文件中的第 303 行“/home/user/anaconda3/envs/sas-controller/lib/python3.8/ multiprocessing/pool.py”,初始化 文件“/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/context.py”中的第 212 行,池文件“/home/user/”中的第 119 行sas/bq_extract_2.py",模块中的第 157 行

编辑 1:将 .read_rows 上的超时更新为 10000,以允许从 BQ 读取大结果。还将 max_stream_count 更改为等于池将使用的核心数。这似乎对我的测试有很大帮助,但是当我在 Google Cloud Compute 实例上将其作为启动脚本运行时,控制台输出中仍然会出现段错误。

编辑 2:我研究得越多,似乎就越不可能有效地将 Python 多处理与 Google BigQuery Storage API 一起使用。鉴于需要在调用 os.fork()创建读取会话,我无法确保为各个进程分配正确的读取行数。每个会话都与它所附加的 BQ 表创建自己的一对多(一个会话对多个流)关系,并且每个会话似乎在流中分解表行的方式略有不同。

举个例子,我们想用 3 个进程导出一个有 30 行的表,每个进程处理一个行流。格式化在移动设备上可能看起来很奇怪。

                       os.fork()

Process 1              Process 2              Process 3
Session1               Session2               Session3
*Stream1 - 10 rows     Stream1 - 8 rows       Stream1 - 9 rows
Stream2 - 10 rows      *Stream2 - 12 rows     Stream2 - 11 rows
Stream3 - 10 rows      Stream3 - 10 rows      *Stream3 - 10 rows

在此示例中,我们最终得到 32 个输出行,因为每个会话没有以完全相同的方式定义其流。

我尝试使用线程(下面的代码)而不是进程,这很有效,因为 gRPC 是线程安全的。

# create read session here
    
# Then call the target worker function with one thread per worker
    for i in range(0, num_streams):
        t = threading.Thread(target=extract_table, args=(i,))
        t.start()

但是,这样做的最大问题是使用 8 个线程与使用 1 个线程所花费的时间一样长,并且无论您现在使用多少线程,跨线程的总吞吐量似乎最大约为 5 MB/s。

这与使用吞吐量似乎随着工人的添加而线性扩展的流程形成对比(我在某些测试中看到高达〜100 MB / s)......在极少数情况下,我能够让它在没有a的情况下工作段错误中断事情。这似乎只是纯粹的运气。

使用 1 个线程:

总时间:~ 3:11

使用 8 个线程:

总时间:~ 3:15

据我所知,使用多个线程基本上没有速度优势。

如果有人对我缺少的任何东西有任何想法,请告诉我!我希望能够让它发挥作用。我真的很喜欢 BQ Storage API 的功能(行过滤器、列选择、没有导出限制),但是在我们找到合适的方式来分散读者之前,我们将无法使用它。

标签: google-bigquerymultiprocessing

解决方案


grpcio(由库使用)和多处理存在一个已知问题。google-cloud-bigquery-storage根据此代码示例,“在任何 gRPC 服务器启动之前分叉工作子进程”。

由于您的工作负载主要受 I/O 限制,因此全局解释器锁不应该是主要的性能瓶颈。我建议使用线程来分配工作,就像在library中所做的那样google-cloud-bigquery

代替:

with mp.Pool(processes=ncpus_buffer) as p:
    result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)

和:

with concurrent.futures.ThreadPoolExecutor(max_workers=num_streams) as p:
    result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)

推荐阅读