首页 > 解决方案 > 如何将 dask 数据框保存到与 dask sheduler/workers 相同的机器上?

问题描述

我正在尝试通过 Dask Dataframe 保存到与 dask 调度程序/工作人员所在的同一台机器上的镶木地板。但是,在此期间我遇到了麻烦。

我的 Dask 设置:我的 python 脚本在我的本地机器(笔记本电脑 16 GB RAM)上执行,但该脚本为远程机器上运行的 Dask 调度程序创建了 Dask 客户端(用于并行计算的服务器具有 400 GB RAM)。Dask 调度程序和工作程序都位于同一台服务器上,因此它们都共享相同的文件系统,可供它们在本地使用。由于我团队的所有成员都使用这个远程 Dask 调度程序,我们正在处理的文件也位于同一台服务器上,通过同一个 Dask 集群为所有成员提供对所有文件的共同访问。

我努力了:

# This saves the parquet files in a folder on my local machine.
ddf.to_parquet(
    '/scratch/dataset_no_dalayed', compression='brotli').compute()

# This delayed call of `ddf.to_parquet` saves the Dask Dataframe chucks
# into individual parquet files (i.e. parts) in the given folder.
# However, I want to persist the Dask dataframe in my workflow, but this
# fails as seen below.
dask.delayed(ddf.to_parquet)(
    '/scratch/dataset_dalayed', compression='brotli').compute()

# If the Dask dataframe is persisted, the `to_parquet` fails with
# a "KilledWorker" error!
ddf = client.persist(ddf)
dask.delayed(ddf.to_parquet)(
    '/scratch/dataset_persist/', compression='brotli').compute()

# In the example below, I can NOT save the Dask dataframe.
# Because the delayed function makes the Dask dataframe
# to a Pandas dataframe on runtime. And this fails as the path is a
# folder and not at file as Pandas requires!
@dask.delayed
def save(new_ddf):
    new_ddf.to_parquet('/scratch/dataset_function/', compression='brotli')

save(ddf).compute()

如何正确执行此操作?

标签: pythondaskparquet

解决方案


通常要将 dask 数据框保存为镶木地板数据集,人们会执行以下操作:

df.to_parquet(...)

从您的问题来看,您的工作人员可能并非都可以访问 NFS 或 S3 等共享文件系统。如果是这种情况并且您存储到本地驱动器,那么您的数据将分散在各种机器上,而没有明显的方法将它们收集在一起。原则上,我鼓励您避免这种情况,并投资于共享文件系统。它们在进行分布式计算时非常有用。

如果你不能这样做,那么我个人可能会并行写入本地驱动器,然后将它们 scp 回一台机器。

如果您的数据集足够小,那么您也可以调用.compute以返回本地 Pandas 数据框,然后使用 Pandas 编写

df.compute().to_parquet(...)

推荐阅读