dask - 如何在 dask 分布式集群中使用 dask_ml 预处理
问题描述
如何在 dask 分布式集群中进行 dask_ml 预处理?我的数据集大约 200GB,每次我对准备 OneHotEncoding 的数据集进行分类时,看起来 dask 都忽略了客户端并尝试将数据集加载到本地机器的内存中。也许我错过了一些东西:
from dask_ml.preprocessing import Categorizer, DummyEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import make_pipeline
import pandas as pd
import dask.dataframe as dd
df = dd.read_csv('s3://some-bucket/files*.csv', dtypes={'column': 'category'})
pipe = make_pipeline(
Categorizer(),
DummyEncoder(),
LogisticRegression(solver='lbfgs')
)
pipe.fit(df, y)
解决方案
需要立即解决的两件事:
- 您尚未在代码中实例化分布式调度程序。
- 您可能应该使用
LogisticRegression
fromdask-ml
而不是scikit-learn
.
工作代码示例
下面是一个有效的最小代码示例。
请注意,预处理函数仅接受 Dask 数据帧,而 LogisticRegression 估计器仅接受 Dask 数组。您可以拆分管道或使用自定义FunctionTransformer
(来自this answer)。有关更多上下文,请参阅此开放的 Dask 问题。
from dask_ml.preprocessing import Categorizer, DummyEncoder
from dask_ml.linear_model import LogisticRegression
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
from dask_ml.datasets import make_classification
X, y = make_classification(chunks=50)
# define custom transformers to include in pipeline
def trans_array(array):
return dd.from_array(array)
transform_array = FunctionTransformer(trans_array)
def trans_df(dataframe):
return dataframe.to_dask_array(lengths=True)
transform_df = FunctionTransformer(trans_df)
pipe = make_pipeline(
transform_array,
Categorizer(),
DummyEncoder(),
transform_df,
LogisticRegression(solver='lbfgs')
)
pipe.fit(X,y)
推荐阅读
- python - 为什么我会得到反向双向链表?
- java - 启用 java 安全管理器时获取系统属性“os.arch”时权限被拒绝
- python - Lambda 返回 ConditionalCheckFailedException 而不是更新项目
- styled-components - 样式化的组件输出为 CSS 文件
- python - 如何使用 python 和 pandas 运行 sql 查询
- angular - 角度垫选择,将选定的值发送到函数
- javascript - 是否有必要将异步放入蓝鸟承诺中
- php - 如何保护作曲家创建/导入的文件?
- postgresql - AWS DMS CDC 任务未检测到列名称和类型更改
- python - 在实现 `unittest.TestCase` 的类中使用 `__name__ == "__main__":`