python-3.x - 如何在 Dask 中正确使用 client.scatter
问题描述
执行“大量”任务时,我收到此错误:
考虑提前使用 client.scatter 分散大型对象,以减少调度程序的负担并保留工作人员的数据
我也收到了一堆这样的消息:
tornado.application - ERROR - Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x7f20d25e10b8>>
Traceback (most recent call last):
File "/home/muammar/.local/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/home/muammar/.local/lib/python3.7/site-packages/bokeh/server/tornado.py", line 542, in _keep_alive
c.send_ping()
File "/home/muammar/.local/lib/python3.7/site-packages/bokeh/server/connection.py", line 80, in send_ping
self._socket.ping(codecs.encode(str(self._ping_count), "utf-8"))
File "/home/muammar/.local/lib/python3.7/site-packages/tornado/websocket.py", line 447, in ping
raise WebSocketClosedError()
tornado.websocket.WebSocketClosedError
tornado.application - ERROR - Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x7f20d25e10b8>>
Traceback (most recent call last):
File "/home/muammar/.local/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/home/muammar/.local/lib/python3.7/site-packages/bokeh/server/tornado.py", line 542, in _keep_alive
c.send_ping()
File "/home/muammar/.local/lib/python3.7/site-packages/bokeh/server/connection.py", line 80, in send_ping
self._socket.ping(codecs.encode(str(self._ping_count), "utf-8"))
File "/home/muammar/.local/lib/python3.7/site-packages/tornado/websocket.py", line 447, in ping
raise WebSocketClosedError()
tornado.websocket.WebSocketClosedError
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:52950 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:52964 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:52970 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:52984 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:52986 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53002 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53016 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53018 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53038 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53042 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53048 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53060 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53068 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53072 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53146 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53156 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53170 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53178 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53186 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53188 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53192 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53194 remote=tcp://127.0.0.1:37945>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:53196 remote=tcp://127.0.0.1:37945>
这些任务是ClassCreatingTheIssue
在我无法访问(我认为)的地方执行的client
。只是你有一个想法,我粘贴在调用这些东西的脚本下面:
from dask.distributed import Client, LocalCluster
import sys
sys.path.append('../../')
from mypackage import SomeClass
from mypackage.module2 import SomeClass2
from mypackage.module3 import ClassCreatingTheIssue
def train():
calc = SomeClass(something=SomeClass2(**stuff),
something2=ClassCreatingTheIssue())
calc.train(training_set=images)
if __name__ == '__main__':
cluster = LocalCluster(n_workers=8, threads_per_worker=2)
client = Client(cluster, asyncronous=True)
train()
我能够缩小导致此错误发生的功能的范围,它看起来是这样的:
def get_lt(self, index):
"""Return LT vectors
Parameters
----------
index : int
Index of image.
Returns
-------
_LT : list
Returns a list that maps atomic fingerprints in the images.
"""
_LT = []
for i, group in enumerate(self.fingerprint_map):
if i == index:
for _ in group:
_LT.append(1.)
else:
for _ in group:
_LT.append(0.)
return _LT
这个延迟函数基本上是返回一个非常大的列表。在这种情况下使用什么方法client.scatter
?我真的很感激任何帮助!
注意:有时整个应用程序在那一点上都死了,一切都失败了。我稍后会确认,因为现在正在运行另一个测试。
解决方案
您使用的是什么版本的 Dask Distributed?我在 1.26,它有警告信息:
/Users/scott/anaconda3/lib/python3.6/site-packages/distributed/worker.py:2791: UserWarning: Large object of size 8.00 MB detected in task graph:
(array([[ 0.02152672, 0.09287627, -0.32135721, .. ... 1.25601994]]),)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s))
这个警告信息已经存在了一段时间(虽然没有确切的数字;GitHub 的责备工具在这里不是很有用)。
这是一个代码片段来说明这一点:
import numpy as np
from distributed import Client
client = Client()
def f(x):
return x.sum()
N = 1_000
x = np.random.randn(N, N)
r1 = client.submit(f, x).result()
x_scattered = client.scatter(x)
r2 = client.submit(f, x_scattered).result()
assert r1 == r2
推荐阅读
- docker - Docker 镜像运行不正确的命令
- python - 使用快捷方式打开 tkinter 窗口
- microsoft-graph-api - 使用 Graph API 发送慢速 Outlook 电子邮件
- python - 使用 hough_line 后关于 hough_line_peaks 的说明
- python - 访问 autograd arraybox 值
- parsing - 用于编辑 lvm.conf 文件的工具
- javascript - 如何将 js 脚本与 html 代码隔离?
- swift - 为什么 ReturnType 在这个 Swift 函数中不起作用?
- svg - 影根?调出描边颜色
- google-sheets - 如何从 LOOKUP 中获取 NETWORKDAYS 的假期范围?