首页 > 解决方案 > LocalCluster() 如何影响任务数量?

问题描述

是否需要在 LocalCluster 内部或外部进行计算(如 dask 方法 dd.merge)?最终计算(如 .compute)需要在 LocalCluster 内部还是外部进行?

我的主要问题是 - LocalCluster() 如何影响任务数量?

我和我的同事注意到,将 dd.merge 放在 LocalCLuster() 之外会显着降低任务数量(比如 10 倍或类似的东西)。这是什么原因?

伪例子

许多任务:

dd.read_parquet(somewhere, index=False)

with LocalCluster(
        n_workers=8,
        processes=True,
        threads_per_worker=1,
        memory_limit="10GB",
        ip="tcp://localhost:9895",
    ) as cluster, Client(cluster) as client:
 dd.merge(smth)
 smth..to_parquet(
            somewhere, engine="fastparquet", compression="snappy"
        )

几个任务:

dd.read_parquet(somewhere, index=False)
dd.merge(smth)

with LocalCluster(
        n_workers=8,
        processes=True,
        threads_per_worker=1,
        memory_limit="10GB",
        ip="tcp://localhost:9895",
    ) as cluster, Client(cluster) as client:
 
 smth..to_parquet(
            somewhere, engine="fastparquet", compression="snappy"
        )

标签: pythondaskdask-dataframe

解决方案


性能差异是由于使用的调度程序不同造成的。

根据 dask文档

每个 dask 集合都有一个默认调度程序

dask.dataframe 默认使用线程调度器

默认调度程序是在没有注册其他调度程序时使用的。

此外,根据 dask 分布式文档

当我们创建一个 Client 对象时,它会将自己注册为默认的 Dask 调度程序。所有 .compute() 方法将自动开始使用分布式系统。

因此,当在集群的上下文管理器中运行时,计算会隐式使用该调度程序。

一些附加说明:默认调度程序使用的线程数可能比您定义的本地集群多。性能上的显着差异也可能是由于线程调度程序没有引起的进程间通信的开销。有关调度程序的更多信息,请参见此处


推荐阅读