python - 如何在多处理期间在共享内存中使用 pandas DataFrame?
问题描述
在一个答案中:共享只读数据是否复制到不同的进程以进行多处理?给出了一个用于 numpy 数组的共享内存的工作解决方案。
如果应该使用 pandas DataFrame 会是什么样子?
背景:我希望能够在多处理期间写入 DataFrame,并希望能够在多处理完成后进一步处理它。
解决方案
如果您不想使用 dask,则可以使用共享内存共享 pandas 数据帧,方法是首先将其转换为 numpy 数组,然后在子进程中重建它。
from multiprocessing import shared_memory
def create_shared_block(to_share, dtypes):
# float64 can't be pickled
for col, dtype in to_share.dtypes.items():
if dtype == 'float64':
to_share[col] = pd.to_numeric(to_share[col], downcast='float')
# make the dataframe a numpy array
to_share.reset_index(inplace=True)
# drop the index if named index
to_share = to_share.drop('index', axis=1)
# get the dtypes in the same order as the dataframe columns and make sure the types are correct for numpy
dtypes_sorted = sort_dtypes(to_share, dtypes)
# get the dataframe values in the format expected by numpy
values = [tuple(x) for x in to_share.values.tolist()]
# create a numpy array
to_share = np.array(values, dtype=(dtypes_sorted))
# create a shared memory of the size of the array
shm = shared_memory.SharedMemory(create=True, size=to_share.nbytes)
# now create a NumPy array backed by shared memory
np_array = np.ndarray(to_share.shape, dtype=dtypes_sorted, buffer=shm.buf)
# Copy the original data into shared memory
np_array[:] = to_share[:]
return shm, np_array, dtypes_sorted
def sort_dtypes(df, dtypes):
# category is a pandas dtype, not numpy
string_types = ('category', 'object', '|S')
dtypes = [(x, '|S{}'.format(df[x].str.len().max())) if y in string_types else (x, y) for x, y in dtypes if
x in df.columns]
# build a lookup
dtypes_dict = {x: y for x, y in dtypes}
# fix the order
dtypes_sorted = [(x, dtypes_dict[x]) for x in df.columns]
return dtypes_sorted
# ------PARENT PROCESS-------#
# create your shared memory
to_share = pd.DataFrame([['obstacle','obstacle',2,3],['obstacles','obstacle',2,np.nan]],columns=['w1','w2','d1','d2'])
dtypes = [('w1','str'),('w2','|S'),('d1','f'),('d2','f')]
shm, arr, dtypes_sorted = create_shared_block(to_share, dtypes)
# then pass these values to your child processes
shared = (shm.name, arr.shape, dtypes_sorted)
# ------CHILD PROCESS-------#
# assuming you have passed to the child process in a variable called shared, you can reconstruct the dataframe as follows
shared_memory = shared_memory.SharedMemory(name=shared[0])
np_array = np.ndarray(shared[1], dtype=shared[2], buffer=shared_memory.buf)
columns = [x for x, y in shared[2]]
df = pd.DataFrame(np_array, columns=columns)
在共享 100k 行数据帧时,这在我的应用程序中节省了一些内存,但可能不如使用 dask 等已建立的库可以节省的那么多。而且我不太确定重新创建 pandas 数据框所涉及的开销——我想它只是引用了共享的 numpy 数组并在顶部添加了一些额外的东西以使其成为 df。
推荐阅读
- javascript - 如何在 UseEffect 中创建 redux 存储,正确的模式?
- java - 修正java中的公式
- ruby-on-rails - 线是做什么的?`:@request.env["devise.mapping"] = Devise.mappings[:user] `
- c - 如何查找 int 值是否包含特定数字?
- linux - 你如何获得一个shell脚本来运行一个没有输入的函数
- express - 如何修复阻止注册用户登录的护照本地身份验证错误
- javascript - 如何在按钮被点击的情况下找到最近的tr
- django - 为什么我收到此错误“__str__ 返回非字符串”Django
- kubernetes - 如何为“kubectl port-forward”设置正确的端口(奇怪地转到 localhost:8080)
- python - 编辑已发送的嵌入机器人消息 discord.py