首页 > 解决方案 > 如何在多处理期间在共享内存中使用 pandas DataFrame?

问题描述

一个答案中:共享只读数据是否复制到不同的进程以进行多处理?给出了一个用于 numpy 数组的共享内存的工作解决方案。

如果应该使用 pandas DataFrame 会是什么样子?

背景:我希望能够在多处理期间写入 DataFrame,并希望能够在多处理完成后进一步处理它。

标签: pythonpandasmultiprocessingpython-multiprocessingpython-multithreading

解决方案


如果您不想使用 dask,则可以使用共享内存共享 pandas 数据帧,方法是首先将其转换为 numpy 数组,然后在子进程中重建它。

from multiprocessing import shared_memory

def create_shared_block(to_share, dtypes):
    # float64 can't be pickled
    for col, dtype in to_share.dtypes.items():
        if dtype == 'float64':
            to_share[col] = pd.to_numeric(to_share[col], downcast='float')
            
    # make the dataframe a numpy array
    to_share.reset_index(inplace=True)
    
    # drop the index if named index
    to_share = to_share.drop('index', axis=1)
    
    # get the dtypes in the same order as the dataframe columns and make sure the types are correct for numpy
    dtypes_sorted = sort_dtypes(to_share, dtypes)
    
    # get the dataframe values in the format expected by numpy
    values = [tuple(x) for x in to_share.values.tolist()]
    
    # create a numpy array
    to_share = np.array(values, dtype=(dtypes_sorted))
    
    # create a shared memory of the size of the array
    shm = shared_memory.SharedMemory(create=True, size=to_share.nbytes)
    
    # now create a NumPy array backed by shared memory
    np_array = np.ndarray(to_share.shape, dtype=dtypes_sorted, buffer=shm.buf)
    
    # Copy the original data into shared memory
    np_array[:] = to_share[:]
    return shm, np_array, dtypes_sorted


def sort_dtypes(df, dtypes):
    # category is a pandas dtype, not numpy
    string_types = ('category', 'object', '|S')
    dtypes = [(x, '|S{}'.format(df[x].str.len().max())) if y in string_types else (x, y) for x, y in dtypes if
              x in df.columns]
    # build a lookup
    dtypes_dict = {x: y for x, y in dtypes}
    # fix the order
    dtypes_sorted = [(x, dtypes_dict[x]) for x in df.columns]
    return dtypes_sorted

# ------PARENT PROCESS-------#
# create your shared memory
to_share = pd.DataFrame([['obstacle','obstacle',2,3],['obstacles','obstacle',2,np.nan]],columns=['w1','w2','d1','d2'])
dtypes = [('w1','str'),('w2','|S'),('d1','f'),('d2','f')]
shm, arr, dtypes_sorted = create_shared_block(to_share, dtypes)

# then pass these values to your child processes
shared = (shm.name, arr.shape, dtypes_sorted)

# ------CHILD PROCESS-------#
# assuming you have passed to the child process in a variable called shared, you can reconstruct the dataframe as follows
shared_memory = shared_memory.SharedMemory(name=shared[0])
np_array = np.ndarray(shared[1], dtype=shared[2], buffer=shared_memory.buf)
columns = [x for x, y in shared[2]]
df = pd.DataFrame(np_array, columns=columns)

在共享 100k 行数据帧时,这在我的应用程序中节省了一些内存,但可能不如使用 dask 等已建立的库可以节省的那么多。而且我不太确定重新创建 pandas 数据框所涉及的开销——我想它只是引用了共享的 numpy 数组并在顶部添加了一些额外的东西以使其成为 df。


推荐阅读