python - Python dask groupby 不适用于“进程”调度程序
问题描述
我正在尝试使用以下代码使用 dask 对 pandas 数据帧并行化 groupby 应用。
import pandas as pd
import dask.dataframe as dd
def dummy_function(df):
"""
This function doing some python calculations
and manipulation to given dataframe
"""
df["new_column"] = df["existing_column"]
return df
given_df = pd.DataFrame({"Phone_no": ["123", "234", "123", "578"], "City": ["ABC", "BCD", "ABC", "EFG"]})
ddf = dd.from_pandas(given_df, npartitions=2)
工作代码:
output_df = ddf.groupby("Phone_no").apply(dummy_function).compute()
尽管这段代码正在运行,但所有内核都没有被使用,但在做了一些研究后,我发现由于 dask 的默认计算调度程序是线程化的,所以这是由于 python 的全局解释器锁 (GIL),所有内核都没有被使用。更多细节在这里。 https://realpython.com/python-gil/
因此我尝试使用“进程”调度程序。
output_df = ddf.groupby("Phone_no").apply(dummy_function).compute(scheduler="processes")
但这是返回以下错误
NotImplementedError('object proxy must define __reduce_ex__()')
我相信多处理在某种程度上使用泡菜,因此这个错误是从那里产生的。我能找到的最接近的相关问题是这个
https://github.com/GrahamDumpleton/wrapt/issues/102#issue-227792648
其中一条评论建议了以下解决方案
https://github.com/GrahamDumpleton/wrapt/issues/102#issuecomment-456528633
我的问题:
上述解决方案是否有效?如果是,那么如何实现它以及我应该在哪里进行编辑或定义 __reduce__ex 函数?
我什至尝试在 wrapt 模块中编辑 ObjectProxy 类,但不知何故无法使其工作。
或者有没有其他方法可以使进程调度程序工作?
或者以 Dask 以外的任何其他方式进行多处理?
解决方案
您可以尝试查看dask.distributed
以将任务提交给调度程序以在工作人员上运行。
您需要dask-scheduler
从外壳启动您的。
dask-scheduler &
dask-scheduler
然后,从 shell中选择您打算使用的仍然地址的工人数量:
dask-worker 172.17.0.2:8786 &
然后,在您的脚本中,您client
通过传递调度程序的 IP 和端口来创建实例。然后,您可以使用此客户端实例通过dask-scheduler
.
client=Client('localhost:8786')
然后,您可以计算结果
output_df=client.submit(result)
client.shutdown()
推荐阅读
- scikit-learn - 尽管有足够的内存,但由于内核不断被杀死,无法在 JupyterLab 上运行 auto-sklearn
- javascript - 如何根据我的时区以特定格式获取日期
- c# - 仅使用名称和类型创建 SqlParameter(未分配值)
- c# - 使用 Microsoft.Owin 验证 SSL 证书
- apache-spark - 如何从 Spark 读取的文件中显示行号?
- node.js - 是什么导致此 Mocha 测试超时?
- c# - 使用 API v3 和 .Net 将 DWG 转换为 DXF
- swift - Xcode 调试器不显示某些变量的值,具体取决于范围
- html - 如何创建垂直/水平嵌套 CSS 菜单?
- c# - ASP.NET Core 如何传递 List
从视图到控制器