首页 > 解决方案 > 如何在 dask 分布式集群中使用 dask_ml 预处理

问题描述

如何在 dask 分布式集群中进行 dask_ml 预处理?我的数据集大约 200GB,每次我对准备 OneHotEncoding 的数据集进行分类时,看起来 dask 都忽略了客户端并尝试将数据集加载到本地机器的内存中。也许我错过了一些东西:

from dask_ml.preprocessing import Categorizer, DummyEncoder

from sklearn.linear_model import LogisticRegression

from sklearn.pipeline import make_pipeline

import pandas as pd

import dask.dataframe as dd

df = dd.read_csv('s3://some-bucket/files*.csv', dtypes={'column': 'category'})
    
pipe = make_pipeline(
   Categorizer(),
   DummyEncoder(),
   LogisticRegression(solver='lbfgs')
)


pipe.fit(df, y)

标签: daskdask-distributeddask-delayeddask-dataframedask-ml

解决方案


需要立即解决的两件事:

  • 您尚未在代码中实例化分布式调度程序。
  • 您可能应该使用LogisticRegressionfrom dask-ml而不是scikit-learn.

工作代码示例

下面是一个有效的最小代码示例。

请注意,预处理函数仅接受 Dask 数据帧,而 LogisticRegression 估计器仅接受 Dask 数组。您可以拆分管道或使用自定义FunctionTransformer(来自this answer)。有关更多上下文,请参阅此开放的 Dask 问题

from dask_ml.preprocessing import Categorizer, DummyEncoder
from dask_ml.linear_model import LogisticRegression
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

from dask_ml.datasets import make_classification
X, y = make_classification(chunks=50)

# define custom transformers to include in pipeline
def trans_array(array):
    return dd.from_array(array)
transform_array = FunctionTransformer(trans_array)

def trans_df(dataframe):
    return dataframe.to_dask_array(lengths=True)
transform_df = FunctionTransformer(trans_df)

pipe = make_pipeline(
    transform_array,
    Categorizer(),
    DummyEncoder(),
    transform_df,
    LogisticRegression(solver='lbfgs')
)

pipe.fit(X,y)


推荐阅读