首页 > 解决方案 > 在 Dask 中返回数据框

问题描述

目标:加快在大型数据框(190 万~行)中逐行应用函数

尝试:使用 dask map_partitions 其中分区 == 核心数。我编写了一个应用于每一行的函数,创建一个包含可变数量的新值(1 到 55 之间)的 dict。此功能独立工作正常。

问题:我需要一种将每个函数的输出组合成最终数据帧的方法。我尝试使用 df.append,我将每个 dict 附加到一个新的数据帧并返回这个数据帧。如果我了解 Dask Docs,那么 Dask 应该将它们组合成一个大 DF。不幸的是,这条线引发了错误(ValueError: could not broadcast input array from shape (56) into shape (1))。这让我相信这与 Dask 中的组合功能有关?

#Function to applied row wise down the dataframe. Takes a column (post) and new empty df. 
def func(post,New_DF):
    post = str(post)
    scores = OtherFUNC.countWords(post)
    scores['post'] = post
    New_DF = New_DF.append(scores, ignore_index=True)
    return(New_DF)

#Dask 
dd.from_pandas(dataset,npartitions=nCores).\
 map_partitions(
      lambda df : df.apply(
         lambda x : func(x.post,New_DF),axis=1)).\
   compute(get=get)

标签: pythonpandasdask

解决方案


我不太确定我是否完全理解您的代码来代替MCVE,但我认为这里存在一些误解。

在这段代码中,您获取一行和一个 DataFrame,并将一行附加到该 DataFrame。

#Function to applied row wise down the dataframe. Takes a column (post) and new empty df. 
def func(post,New_DF):
    post = str(post)
    scores = OtherFUNC.countWords(post)
    scores['post'] = post
    New_DF = New_DF.append(scores, ignore_index=True)
    return(New_DF)

New_DF我建议不要附加到,而是只返回一个pd.Series连接df.applyDataFrame. 那是因为如果你在所有分区中附加到同一个New_DF对象nCores,你一定会遇到麻烦。

 #Function to applied row wise down the dataframe. Takes a row and returns a row. 
def tobsecret_func(row):
    post = str(row.post)
    scores = OtherFUNC.countWords(post)
    scores['post'] = post
    length_adjusted_series = pd.Series(scores).reindex(range(55))
    return(length_adjusted_series)

您的错误还表明,正如您在问题中所写,您的函数会创建可变数量的值。如果pd.Series您返回的形状和列名不同,则df.apply无法将它们连接成pd.DataFrame. 因此,请确保您pd.Series每次都返回一个相同的形状。这个问题向您展示了如何创建pd.Series等长和索引:Pandas: pad series on top or bottom

我不知道dict你的OtherFUNC.countWords回报到底是什么样的,所以你可能想要调整这条线: length_adjusted_series = pd.Series(scores).reindex(range(55))

照原样,该行将返回一个索引为 0、1、2、...、54 和最多 55 个值的 Series(如果 dict 最初具有少于 55 个键,则剩余单元格将包含NaN值)。这意味着在应用于 a 之后DataFrame,该 DataFrame 的列将被命名为 0、1、2、...、54。

现在您将您dataset的函数映射到每个分区,并在每个分区中将其应用于DataFrameusing apply

#Dask 
dd.from_pandas(dataset,npartitions=nCores).\
 map_partitions(
      lambda df : df.apply(
         lambda x : func(x.post,New_DF),axis=1)).\
   compute(get=get)

map_partitions期望一个将 DataFrame 作为输入并输出 DataFrame 的函数。您的函数通过使用一个 lambda 函数来执行此操作,该函数基本上调用您的其他函数并将其应用于 DataFrame,而 DataFrame 又返回一个 DataFrame。这可行,但我强烈建议编写一个命名函数,该函数将 DataFrame 作为输入并输出 DataFrame,它使您更容易调试代码。

例如,使用这样的简单包装函数:

df_wise(df):
    return df.apply(tobsecret_func)

特别是当您的代码变得更加复杂时,避免使用lambda像您的自定义那样调用非平凡代码的函数,func而是制作一个简单的命名函数可以帮助您调试,因为回溯不仅会引导您进入包含一堆 lambda 函数的行,例如在您的代码中,但也将直接指向命名函数df_wise,因此您将确切地看到错误来自何处。

#Dask 
dd.from_pandas(dataset,npartitions=nCores).\
 map_partitions(df_wise, 
                meta=df_wise(dd.head())
                ).\
   compute(get=get)

请注意,我们刚刚创建了我们的元关键字,这dd.head()类似于df_wiseDask 在引擎盖下所做的事情。

您正在使用同步调度程序 dask.get,这就是整个 New_DF.append(...) 代码可以工作的原因,因为您为每个连续分区附加到 DataFrame。

这不会为您提供任何并行性,因此如果您使用其他调度程序之一将无法工作,所有这些调度程序都会使您的代码并行化。

文档还提到了meta关键字参数,您应该将其提供给您的调用map_partitions,因此 dask 知道您的 DataFrame 将包含哪些列。如果您不这样做,dask 将首先必须在其中一个分区上试运行您的函数并检查输出的形状,然后才能继续执行其他分区。如果您的分区很大,这会使您的代码减慢很多;给出meta关键字绕过了这个不必要的 dask 计算。


推荐阅读