首页 > 解决方案 > 当熊猫数据框超过 3GB 时 Pyarrow.flight.do_get 段错误

问题描述

我的飞行服务器中有两个数据框:v1 和 v2。V1 很小,v2 大约 3gb。我可以成功地向服务器请求 v1,但是在请求 v2 时会发生段错误。

import numpy.random as rnd
import pandas as pd
import pyarrow as pa
import pyarrow.flight as fl
import numpy as np


class MyFlightServer(FlightServerBase):
    def __init__(self, location=None, options=None, **kwargs):
        super().__init__(location, **kwargs)
        self.tables = {}
        rng = rnd.default_rng()
        df = pd.DataFrame(np.random.standard_normal((1000, 5))).rename(
            columns={k: "col" + str(k) for k in range(5)}
        )
        self.tables[b"v1"] = pa.Table.from_pandas(df)

        df2 = pd.DataFrame(np.random.standard_normal((100000000, 5))).rename(
            columns={k: "col" + str(k) for k in range(5)}
        )
        print(df2.info())
        self.tables[b"v2"] = pa.Table.from_pandas(df2)

    def do_get(self, context, ticket):
        return RecordBatchStream(self.tables[ticket.ticket])


def main():
    with MyFlightServer() as server:
        # This works
        client = fl.connect(("localhost", server.port))
        data = client.do_get(fl.Ticket("v1")).read_pandas()

        # This will get a seg fault
        data = client.do_get(fl.Ticket("v2")).read_pandas()


main()

以上产生以下输出(可能与 df 细节有关):

RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
 #   Column  Dtype  
---  ------  -----  
 0   col0    float64
 1   col1    float64
 2   col2    float64
 3   col3    float64
 4   col4    float64
dtypes: float64(5)
memory usage: 3.7 GB
None
Segmentation fault (core dumped)

运行它的机器有 64GB RAM,其中大约 15GB 在运行时正在使用。因此,我打折(也许是天真地)它找不到连续的内存块的可能性。

我是在滥用飞行服务器/记录批处理流媒体还是这可能是一个错误?

涉及的版本:

numpy==1.21.0
pandas==1.2.5
pyarrow==4.0.1
python-dateutil==2.8.1
pytz==2021.1
six==1.16.0

Python 3.9.5 (default, May 19 2021, 11:32:47) 
[GCC 9.3.0]

标签: pythonpyarrow

解决方案


这是 Arrow Flight 中的一个错误。有关详细信息,请参阅ARROW-13253。本质上,这是因为 Flight 处理程序将数据作为单个 RecordBatch 发送,但目前,Flight 不支持发送大小 > 2GiB 的记录批次。但是,内部序列化处理程序在没有初始化输出缓冲区的情况下返回错误,gRPC 试图在不检查错误的情况下盲目操作,从而导致崩溃。

您可以通过显式分块数据来解决此问题:

def do_get(self, context, ticket):
    table = self.tables[ticket.ticket]
    batches = table.to_batches(max_chunksize=65536)
    return fl.GeneratorStream(table.schema, batches)

推荐阅读