首页 > 解决方案 > 延迟 dask.dataframe.DataFrame.to_hdf 计算崩溃

问题描述

我正在使用 Dask 来执行以下逻辑:

如果我compute=False在我的调用中使用并提供每个调用返回to_hdf的 s 列表,那么我会遇到崩溃/段错误。(如果我省略一切运行良好)。一些谷歌搜索给了我一些关于锁的信息;我尝试添加 a和feed to以及 a ,但我无法解决崩溃问题。Delayedto_hdfdask.computecompute=Falsedask.distributed.Clientdask.distributed.Lockto_hdfdask.utils.SerializableLock

这是流程:

import uproot
import dask
import dask.dataframe as dd
from dask.delayed import delayed

def delayed_frame(files, tree_name):
    """create master delayed DataFrame from multiple files"""
    @delayed
    def single_frame(file_name, tree_name):
        """read external file, convert to pandas.DataFrame, return it"""
        tree = uproot.open(file_name).get(tree_name)
        return tree.pandas.df() ## this is the pd.DataFrame
    return dd.from_delayed([single_frame(f, tree_name) for f in files])

def save_selected_frames(df, selections, prefix):
    """perform queries on a delayed DataFrame and save HDF5 output"""
    queries = {sel_name: df.query(sel_query)
               for sel_name, sel_query in selections.items()]
    computes = []
    for dfname, df in queries.items():
        outname = f"{prefix}_{dfname}.h5"
        computes.append(df.to_hdf(outname, f"/{prefix}", compute=False))
    dask.compute(*computes)


selections = {"s1": "(A == True) & (N > 1)",
              "s2": "(B == True) & (N > 2)",
              "s3": "(C == True) & (N > 3)"}

from glob import glob
df = delayed_frame(glob("/path/to/files/*.root"), "selected")
save_selected_frames(df, selections, "selected")

## expect output files:
##  - selected_s1.h5
##  - selected_s2.h5
##  - selected_s3.h5

标签: daskdask-distributeddask-delayed

解决方案


也许您使用的 HDF 库不是完全线程安全的?如果您不介意失去并行性,那么您可以添加scheduler="single-threaded"到计算调用中。

您可能需要考虑使用 Parquet 而不是 HDF。这样的问题少了。


推荐阅读