python - Dask 以编程方式启动远程工作人员
问题描述
我需要以编程方式创建远程工作者并将它们用于任务,然后将它们关闭。
文档中给出的示例代码适用于所写的内容:
import asyncio
from distributed import Worker, Scheduler, Client
from distributed.scheduler import WorkerState
s = "x.x.x.x:8786" # remote IP, not local, started from command line.
async def f():
async with Worker(s) as w1, Worker(s) as w2:
async with Client(s, asynchronous=True) as client:
future = client.submit(lambda x: x + 1, 10)
result = await future
print(result)
asyncio.get_event_loop().run_until_complete(f())
假设我有n
不同的机器,而不是 dask scheduler - ip1, ip2, ..ipn
。现在,我面临两个问题:
- 连接到远程调度程序后,我想在多台机器上创建工作人员。比方说
ip1, ip2, ip3
。尝试在创建中同时使用host
和contact_address
参数。Worker
工人从调度程序的本地本身开始,而不是所需的机器。如何在连接到同一调度程序的所需机器上远程启动工作人员? - 我需要
client
在async
函数中创建以随时间在多个submit
,map
, 调用中使用。我也有许多自定义 python 函数。那么,我如何以编程方式在不同的机器上创建工作程序,创建一个client
并在异步函数之外随着时间的推移使用它。我尝试跟随,不成功。
s_address = "x.x.x.x:8786" # remote scheduler IP
async def f():
async with Worker(s_address) as w1, Worker(s_address) as w2:
async with Client(s_address, asynchronous=True) as client:
return client
client_to_use = f() # expecting client object which I can use and...
# ...when everything finishes, hoping context manager kills the workers.
# This clearly doesn't work
asyncio.get_event_loop().run_until_complete(f()) # not sure if this is valid anymore
# What I need to do
custom_module.call_some_fn_to_use_dask_client(client_to_use) # Does not work as well!! ```
解决方案
推荐阅读
- sql - 如何使用 SQL 合并 Access 中的两个表。两个表有几个共同的列,但也有几个不共同的列
- javascript - 遍历数组并分配新值
- c# - 存储过程在价格范围内查找产品
- python - Flask-Limiter 不适用于基于 Flask-Restful API 的应用程序
- email - 如何在 logstash 输出电子邮件中附加附件?
- python - 我遇到了函数错误之外的返回问题
- haskell - 使用类别的 Functor 进行映射
- asp.net - Ajax AutoCompleteExtender 工具包 ASP.net vb
- python - 为什么 ImportError: cannot import name 'AutoReg' from 'statsmodels.tsa.ar_model' 发生?
- python - 解酸堆栈下溢