python - 在 Dask 中返回数据框
问题描述
目标:加快在大型数据框(190 万~行)中逐行应用函数
尝试:使用 dask map_partitions 其中分区 == 核心数。我编写了一个应用于每一行的函数,创建一个包含可变数量的新值(1 到 55 之间)的 dict。此功能独立工作正常。
问题:我需要一种将每个函数的输出组合成最终数据帧的方法。我尝试使用 df.append,我将每个 dict 附加到一个新的数据帧并返回这个数据帧。如果我了解 Dask Docs,那么 Dask 应该将它们组合成一个大 DF。不幸的是,这条线引发了错误(ValueError: could not broadcast input array from shape (56) into shape (1))。这让我相信这与 Dask 中的组合功能有关?
#Function to applied row wise down the dataframe. Takes a column (post) and new empty df.
def func(post,New_DF):
post = str(post)
scores = OtherFUNC.countWords(post)
scores['post'] = post
New_DF = New_DF.append(scores, ignore_index=True)
return(New_DF)
#Dask
dd.from_pandas(dataset,npartitions=nCores).\
map_partitions(
lambda df : df.apply(
lambda x : func(x.post,New_DF),axis=1)).\
compute(get=get)
解决方案
我不太确定我是否完全理解您的代码来代替MCVE,但我认为这里存在一些误解。
在这段代码中,您获取一行和一个 DataFrame,并将一行附加到该 DataFrame。
#Function to applied row wise down the dataframe. Takes a column (post) and new empty df.
def func(post,New_DF):
post = str(post)
scores = OtherFUNC.countWords(post)
scores['post'] = post
New_DF = New_DF.append(scores, ignore_index=True)
return(New_DF)
New_DF
我建议不要附加到,而是只返回一个pd.Series
连接df.apply
成DataFrame
. 那是因为如果你在所有分区中附加到同一个New_DF
对象nCores
,你一定会遇到麻烦。
#Function to applied row wise down the dataframe. Takes a row and returns a row.
def tobsecret_func(row):
post = str(row.post)
scores = OtherFUNC.countWords(post)
scores['post'] = post
length_adjusted_series = pd.Series(scores).reindex(range(55))
return(length_adjusted_series)
您的错误还表明,正如您在问题中所写,您的函数会创建可变数量的值。如果pd.Series
您返回的形状和列名不同,则df.apply
无法将它们连接成pd.DataFrame
. 因此,请确保您pd.Series
每次都返回一个相同的形状。这个问题向您展示了如何创建pd.Series
等长和索引:Pandas: pad series on top or bottom
我不知道dict
你的OtherFUNC.countWords
回报到底是什么样的,所以你可能想要调整这条线:
length_adjusted_series = pd.Series(scores).reindex(range(55))
照原样,该行将返回一个索引为 0、1、2、...、54 和最多 55 个值的 Series(如果 dict 最初具有少于 55 个键,则剩余单元格将包含NaN
值)。这意味着在应用于 a 之后DataFrame
,该 DataFrame 的列将被命名为 0、1、2、...、54。
现在您将您dataset
的函数映射到每个分区,并在每个分区中将其应用于DataFrame
using apply
。
#Dask
dd.from_pandas(dataset,npartitions=nCores).\
map_partitions(
lambda df : df.apply(
lambda x : func(x.post,New_DF),axis=1)).\
compute(get=get)
map_partitions
期望一个将 DataFrame 作为输入并输出 DataFrame 的函数。您的函数通过使用一个 lambda 函数来执行此操作,该函数基本上调用您的其他函数并将其应用于 DataFrame,而 DataFrame 又返回一个 DataFrame。这可行,但我强烈建议编写一个命名函数,该函数将 DataFrame 作为输入并输出 DataFrame,它使您更容易调试代码。
例如,使用这样的简单包装函数:
df_wise(df):
return df.apply(tobsecret_func)
特别是当您的代码变得更加复杂时,避免使用lambda
像您的自定义那样调用非平凡代码的函数,func
而是制作一个简单的命名函数可以帮助您调试,因为回溯不仅会引导您进入包含一堆 lambda 函数的行,例如在您的代码中,但也将直接指向命名函数df_wise
,因此您将确切地看到错误来自何处。
#Dask
dd.from_pandas(dataset,npartitions=nCores).\
map_partitions(df_wise,
meta=df_wise(dd.head())
).\
compute(get=get)
请注意,我们刚刚创建了我们的元关键字,这dd.head()
类似于df_wise
Dask 在引擎盖下所做的事情。
您正在使用同步调度程序 dask.get,这就是整个 New_DF.append(...) 代码可以工作的原因,因为您为每个连续分区附加到 DataFrame。
这不会为您提供任何并行性,因此如果您使用其他调度程序之一将无法工作,所有这些调度程序都会使您的代码并行化。
文档还提到了meta
关键字参数,您应该将其提供给您的调用map_partitions
,因此 dask 知道您的 DataFrame 将包含哪些列。如果您不这样做,dask 将首先必须在其中一个分区上试运行您的函数并检查输出的形状,然后才能继续执行其他分区。如果您的分区很大,这会使您的代码减慢很多;给出meta
关键字绕过了这个不必要的 dask 计算。
推荐阅读
- excel - 如果目标地址 - 我如何使用范围?
- react-native - 错误:“OneSignalNotificationServiceExtension”需要配置文件。在签名和功能编辑器中选择配置文件
- python - Python用面板控制多个py脚本运行时
- progressive-web-apps - 具有离线后台同步示例的 PWA
- azure - 如何使用 Terraform 创建 Azure 警报
- java - 如何执行仅安装 Java 8 的 Java 11 jar?
- c++ - 如何多次将参数传递给构造函数
- google-api - google people api 格式问题“googleapiclient.errors.UnknownApiNameOrVersion: name: people version: v1”面临的问题
- typescript - Typescript 不会因类型处理不当而产生错误
- nginx - Nginx 负载均衡器无法正常工作一些请求返回 400 bad gateway