python-3.x - Dask 应用自定义功能
问题描述
我正在尝试使用 Dask,但是apply
在分组后使用时遇到了问题。
我有一个包含大量行的 Dask DataFrame。让我们考虑以下示例
N=10000
df = pd.DataFrame({'col_1':np.random.random(N), 'col_2': np.random.random(N) })
ddf = dd.from_pandas(df, npartitions=8)
我想对 的值进行分类,并从这里col_1
遵循解决方案
bins = np.linspace(0,1,11)
labels = list(range(len(bins)-1))
ddf2 = ddf.map_partitions(test_f, 'col_1',bins,labels)
在哪里
def test_f(df,col,bins,labels):
return df.assign(bin_num = pd.cut(df[col],bins,labels=labels))
这正如我所期望的那样工作。
现在我想取每个 bin 中的中值(取自这里)
median = ddf2.groupby('bin_num')['col_1'].apply(pd.Series.median).compute()
有 10 个 bin,我希望median
有 10 行,但实际上有 80 行。数据框有 8 个分区,所以我猜想应用程序以某种方式单独处理每个分区。
但是,如果我想要平均值并使用mean
median = ddf2.groupby('bin_num')['col_1'].mean().compute()
它可以工作,输出有 10 行。
那么问题是:我做错了什么阻止apply
了操作mean
?
解决方案
你说的对!我能够在 Dask 2.11.0 上重现您的问题。好消息是有一个解决方案!Dask groupby 问题似乎与类别类型(pandas.core.dtypes.dtypes.CategoricalDtype)有关。通过将类别列转换为另一种列类型(float、int、str),groupby 将正常工作。
这是我复制的您的代码:
import dask.dataframe as dd
import pandas as pd
import numpy as np
def test_f(df, col, bins, labels):
return df.assign(bin_num=pd.cut(df[col], bins, labels=labels))
N = 10000
df = pd.DataFrame({'col_1': np.random.random(N), 'col_2': np.random.random(N)})
ddf = dd.from_pandas(df, npartitions=8)
bins = np.linspace(0,1,11)
labels = list(range(len(bins)-1))
ddf2 = ddf.map_partitions(test_f, 'col_1', bins, labels)
print(ddf2.groupby('bin_num')['col_1'].apply(pd.Series.median).compute())
打印出你提到的问题
bin_num
0 NaN
1 NaN
2 NaN
3 NaN
4 NaN
...
5 0.550844
6 0.651036
7 0.751220
8 NaN
9 NaN
Name: col_1, Length: 80, dtype: float64
这是我的解决方案:
ddf3 = ddf2.copy()
ddf3["bin_num"] = ddf3["bin_num"].astype("int")
print(ddf3.groupby('bin_num')['col_1'].apply(pd.Series.median).compute())
其中打印:
bin_num
9 0.951369
2 0.249150
1 0.149563
0 0.049897
3 0.347906
8 0.847819
4 0.449029
5 0.550608
6 0.652778
7 0.749922
Name: col_1, dtype: float64
@MRocklin 或 @TomAugspurger 您能否在新版本中为此创建修复程序?我认为这里有足够的可重现代码。感谢您的辛勤工作。我喜欢 Dask 并且每天都使用它;)
推荐阅读
- javascript - 如何自动检测具有相同类名的当前表单元素
- c# - 构建错误 - Roslyn MSB3883:意外异常: System.Security.Principal.Windows 的 System.IO.FileNotFoundException
- python - Python shutil.copytree 说它应该复制的目标文件丢失
- reactjs - 语义用户界面反应选择表格单元格不起作用
- vue.js - 使用 vuex 和 vue-router 时未定义的数据
- r - R:动态调整输出pdf大小以绘制函数内的区域
- reactjs - 通过 electron 调用 watson 的创建分类器 API
- google-cloud-bigtable - opentsdb 查询响应中返回的标签值为“node”是什么意思?
- javascript - 使用量角器关闭窗口弹出窗口
- java - 您如何确定何时在运行时调用接口方法?