首页 > 解决方案 > Dask 以编程方式启动远程工作人员

问题描述

我需要以编程方式创建远程工作者并将它们用于任务,然后将它们关闭。

文档中给出的示例代码适用于所写的内容:

import asyncio
from distributed import Worker, Scheduler, Client
from distributed.scheduler import WorkerState

s = "x.x.x.x:8786" # remote IP, not local, started from command line.

async def f():
    async with Worker(s) as w1, Worker(s) as w2:
        async with Client(s, asynchronous=True) as client:
            future = client.submit(lambda x: x + 1, 10)
            result = await future
            print(result)

asyncio.get_event_loop().run_until_complete(f())

假设我有n不同的机器,而不是 dask scheduler - ip1, ip2, ..ipn。现在,我面临两个问题:

  1. 连接到远程调度程序后,我想在多台机器上创建工作人员。比方说ip1, ip2, ip3。尝试在创建中同时使用hostcontact_address参数。Worker工人从调度程序的本地本身开始,而不是所需的机器。如何在连接到同一调度程序的所需机器上远程启动工作人员?
  2. 我需要clientasync函数中创建以随时间在多个submit, map, 调用中使用。我也有许多自定义 python 函数。那么,我如何以编程方式在不同的机器上创建工作程序,创建一个client并在异步函数之外随着时间的推移使用它。我尝试跟随,不成功。

s_address = "x.x.x.x:8786" # remote scheduler IP

async def f():
    async with Worker(s_address) as w1, Worker(s_address) as w2:
        async with Client(s_address, asynchronous=True) as client:
            return client

client_to_use = f() # expecting client object which I can use and...
                    # ...when everything finishes, hoping context manager kills the workers.
                    # This clearly doesn't work
asyncio.get_event_loop().run_until_complete(f()) # not sure if this is valid anymore

# What I need to do
custom_module.call_some_fn_to_use_dask_client(client_to_use) # Does not work as well!! ```

标签: pythondaskdask-distributed

解决方案


您应该阅读可用的各种选项来设置 dask。简而言之,您需要一种与要在其上启动工作人员的机器对话的方法。调度器不知道怎么做,本地客户端也不知道怎么做,需要自己挑机制。它可以像登录到远程机器并启动工作进程(即,运行 python)一样简单,但是还有一些更复杂的系统,例如超级计算机调度程序、yarn 和 kubernetes。

文档Worker清楚地表明您在此处实例化调用发生的位置。

在更进一步之前,您也许应该考虑一下您要实现的目标是什么,并描述...


推荐阅读