首页 > 解决方案 > Python dask groupby 不适用于“进程”调度程序

问题描述

我正在尝试使用以下代码使用 dask 对 pandas 数据帧并行化 groupby 应用。

import pandas as pd
import dask.dataframe as dd
def dummy_function(df):
    """
    This function doing some python calculations
    and manipulation to given dataframe
    """
    df["new_column"] = df["existing_column"]
    return df
given_df = pd.DataFrame({"Phone_no": ["123", "234", "123", "578"], "City": ["ABC", "BCD", "ABC", "EFG"]})
ddf = dd.from_pandas(given_df, npartitions=2)

工作代码

output_df = ddf.groupby("Phone_no").apply(dummy_function).compute()

尽管这段代码正在运行,但所有内核都没有被使用,但在做了一些研究后,我发现由于 dask 的默认计算调度程序是线程化的,所以这是由于 python 的全局解释器锁 (GIL),所有内核都没有被使用。更多细节在这里。 https://realpython.com/python-gil/

因此我尝试使用“进程”调度程序。

output_df = ddf.groupby("Phone_no").apply(dummy_function).compute(scheduler="processes")

但这是返回以下错误

NotImplementedError('object proxy must define __reduce_ex__()')

我相信多处理在某种程度上使用泡菜,因此这个错误是从那里产生的。我能找到的最接近的相关问题是这个

https://github.com/GrahamDumpleton/wrapt/issues/102#issue-227792648

其中一条评论建议了以下解决方案

https://github.com/GrahamDumpleton/wrapt/issues/102#issuecomment-456528633

我的问题

上述解决方案是否有效?如果是,那么如何实现它以及我应该在哪里进行编辑或定义 __reduce__ex 函数?

我什至尝试在 wrapt 模块中编辑 ObjectProxy 类,但不知何故无法使其工作。

或者有没有其他方法可以使进程调度程序工作?

或者以 Dask 以外的任何其他方式进行多处理?

标签: pythonpicklepython-multiprocessingdask

解决方案


您可以尝试查看dask.distributed以将任务提交给调度程序以在工作人员上运行。

您需要dask-scheduler从外壳启动您的。

dask-scheduler &

dask-scheduler然后,从 shell中选择您打算使用的仍然地址的工人数量:

dask-worker 172.17.0.2:8786 & 

然后,在您的脚本中,您client通过传递调度程序的 IP 和端口来创建实例。然后,您可以使用此客户端实例通过dask-scheduler.

client=Client('localhost:8786')

然后,您可以计算结果

output_df=client.submit(result)
client.shutdown()

推荐阅读