首页 > 解决方案 > 将自定义文件格式读取到 Dask 数据框

问题描述

我有一个巨大的自定义文本文件(无法将整个数据加载到一个熊猫数据框中),我想将其读入 Dask 数据框。我编写了一个生成器来读取和解析块中的数据并创建熊猫数据框。我想将这些 pandas 数据帧加载到一个 dask 数据帧中并对生成的数据帧执行操作(例如创建计算列、提取数据帧的一部分、绘图等)。我尝试使用 Dask bag 但无法成功。所以我决定将生成的数据帧写入 HDFStore,然后使用 Dask 从 HDFStore 文件中读取。当我在自己的电脑上做这件事时,这很有效。代码如下。

cc = read_custom("demo.xyz", chunks=1000) # Generator of pandas dataframes
from pandas import HDFStore
s = HDFStore("demo.h5")
for c in cc:
    s.append("data", c, format='t', append=True)
s.close()

import dask.dataframe as dd
ddf = dd.read_hdf("demo.h5", "data", chunksize=100000)
seqv = (
    (
        (ddf.sxx - ddf.syy) ** 2
        + (ddf.syy - ddf.szz) ** 2
        + (ddf.szz - ddf.sxx) ** 2
        + 6 * (ddf.sxy ** 2 + ddf.syz ** 2 + ddf.sxz ** 2)
    )
    / 2
) ** 0.5
seqv.compute()

由于最后一次计算速度很慢,我决定将它分发到我局域网上的几个系统上,并在我的机器上启动一个调度程序,并在其他系统上启动几个工作人员。并启动Client如下。

from dask.distributed import Client
client = Client('mysystemip:8786') #Establishing connection with the scheduler all fine.

然后读入 Dask 数据框。但是,当我执行seqv.compute().

HDF5ExtError: HDF5 error back trace

  File "H5F.c", line 509, in H5Fopen
    unable to open file
  File "H5Fint.c", line 1400, in H5F__open
    unable to open file
  File "H5Fint.c", line 1615, in H5F_open
    unable to lock the file
  File "H5FD.c", line 1640, in H5FD_lock
    driver lock request failed
  File "H5FDsec2.c", line 941, in H5FD_sec2_lock
    unable to lock file, errno = 11, error message = 'Resource temporarily unavailable'

End of HDF5 error back trace

Unable to open/create file 'demo.h5'

我已确保所有员工都可以访问demo.h5文件。我尝试传入lock=Falsein read_hdf。得到同样的错误。

这不可能吗?可以尝试其他文件格式吗?我想将每个 pandas 数据帧写入单独的文件可能会起作用,但我试图避免它(我什至不想要一个中间 HDFS 文件)。但在我到达那条路线之前,我想知道是否有其他更好的方法来解决这个问题。

感谢您的任何建议!

标签: pythonpandasdataframehdfsdask

解决方案


如果您想从文本文件中的自定义格式读取数据,我建议使用该dask.bytes.read_bytes函数,该函数返回延迟对象列表,每个对象都指向文件中的一个字节块。默认情况下,这些块将由行分隔符干净地分隔。

像这样的东西可能会起作用:

def parse_bytes(b: bytes) -> pandas.DataFrame:
    ...

blocks = dask.bytes.read_bytes("my-file.txt", delimiter=b"\n")
dataframes = [dask.delayed(parse_bytes)(block) for block in blocks]
df = dask.dataframe.from_delayed(dataframes)

推荐阅读