python - Dask 自适应缩放因错误而失败
问题描述
我正在尝试在 Dask 中实现自适应缩放。但是在循环中出现以下错误。
AttributeError: 'NoneType' object has no attribute 'adaptive_target'
痕迹
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x10ab2a0b8>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:179> exception=AttributeError("'NoneType' object has no attribute 'adaptive_target'")>)
Traceback (most recent call last):
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 191, in adapt
target = await self.safe_target()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 129, in safe_target
n = await self.target()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 145, in target
return await self.scheduler.adaptive_target(
AttributeError: 'NoneType' object has no attribute 'adaptive_target'
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x10ab2a0b8>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:179> exception=AttributeError("'NoneType' object has no attribute 'adaptive_target'")>)
Traceback (most recent call last):
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 191, in adapt
target = await self.safe_target()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 129, in safe_target
n = await self.target()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 145, in target
return await self.scheduler.adaptive_target(
AttributeError: 'NoneType' object has no attribute 'adaptive_target'
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x10ab2a0b8>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:179> exception=AttributeError("'NoneType' object has no attribute 'adaptive_target'")>)
Traceback (most recent call last):
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 191, in adapt
target = await self.safe_target()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 129, in safe_target
n = await self.target()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 145, in target
return await self.scheduler.adaptive_target(
AttributeError: 'NoneType' object has no attribute 'adaptive_target'
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x10ab2a0b8>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:179> exception=AttributeError("'NoneType' object has no attribute 'adaptive_target'")>)
Traceback (most recent call last):
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 191, in adapt
target = await self.safe_target()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 129, in safe_target
n = await self.target()
File "/Users/jpujari/Documents/sandbox/project/venv/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 145, in target
return await self.scheduler.adaptive_target(
AttributeError: 'NoneType' object has no attribute 'adaptive_target'
我有一个自定义集群类,如下所示
from distributed.deploy import Cluster
import boto3
class EcsCluster(Cluster):
def __init__(self, gpu=False):
# status = 'init'
self.gpu = gpu
super().__init__(asynchronous=False)
def get_cluster_name(self):
return 'jp-test'
def get_service_name(self):
service_name = 'jp-test-gpu-worker-service' if self.gpu else 'jp-test-cpu-worker-service'
return service_name
def scale_up(self, no_of_workers):
try:
client = boto3.client('ecs')
response = client.update_service(
cluster=self.get_cluster_name(),
service=self.get_service_name(),
desiredCount=no_of_workers,
)
except Exception as ex:
print(f"Problem occurred during scale up: {ex}")
def scale_down(self, no_of_workers=0):
try:
client = boto3.client('ecs')
response = client.update_service(
cluster=self.get_cluster_name(),
service=self.get_service_name(),
desiredCount=no_of_workers,
)
print(response)
except Exception as ex:
print(f"Problem occurred during scale down: {ex}")
这是我试图运行的代码
from project.distributed.ecscluster import EcsCluster
from dask.distributed import Client
cluster = EcsCluster()
cluster.adapt(minimum=0, maximum=5)
解决方案
为了获得自适应缩放,Dask Cluster 子类需要实现一些属性。我建议查看 Adaptive 类以获取更多信息。