python - 在子流程之间传递用户定义的对象
问题描述
我有三个文件如下
""" main.py """
import time
import ray
class LocalBuffer(dict):
def __call__(self):
return self
@ray.remote
class Worker():
def __init__(self, learner):
self.local = LocalBuffer()
self.learner = learner
def sample(self):
for i in range(10):
self.local.update({
'state': [1, 2, 3]
})
print(self.local)
self.learner.update_buffer.remote(self.local)
@ray.remote
class Learner():
def __init__(self):
self.buffer = {}
def update_buffer(self, local_buffer):
print(local_buffer)
self.buffer['state'] = local_buffer['state']
ray.init()
learner = Learner.remote()
worker = Worker.remote(learner)
worker.sample.remote()
time.sleep(10)
如果我删除所有与ray
. 如果没有,就会发生错误。错误消息说state
inlocal_buffer
中没有update_buffer
。我知道错误是由于在中LocalBuffer
定义的worker.py
——如果我定义Worker.local
为内置的dict
,一切都会好起来的。但是为什么我不能用LocalBuffer
?我在这里确实需要它,但我不知道如何使它工作。
更新
我知道问题出在哪里。原因是worker
和learner
处于不同的进程中。而用户定义的对象如self.local
不能在进程之间传递。对于这个特定问题,我可以通过强制转换self.local
为dict
whenself.local
被传递给self.learner.update_buffer
. 我尝试导入LocalBuffer
in learner.py
,但没有成功。也许我必须更多地了解多处理才能弄清楚。如果有人愿意填写一些有用的信息,我将不胜感激。
解决方案
我们必须制作LocalBuffer
一个射线演员才能使其工作。以下代码按需要工作。
import ray
@ray.remote
class LocalBuffer(dict):
# have to redefine these functions in order to make it work with ray
def __getitem__(self, k):
return super().__getitem__(k)
def update(self, d):
super().update(d)
def __call__(self):
# cannot return self since self is a ray actor
return dict(super().items())
@ray.remote
class Worker():
def __init__(self, learner):
self.local = LocalBuffer.remote()
self.learner = learner
def sample(self):
for i in range(10):
id = self.local.update.remote({
'state': [1, 2, 3]
})
print(ray.get(self.local.__call__.remote()))
self.learner.update_buffer.remote(self.local)
@ray.remote
class Learner():
def __init__(self):
self.buffer = {}
def update_buffer(self, local_buffer):
print(ray.get(local_buffer.__call__.remote()))
self.buffer['state'] = ray.get(local_buffer.__getitem__.remote('state'))
print('learner buffer', self.buffer)
ray.init()
learner = Learner.remote()
worker = Worker.remote(learner)
ray.get(worker.sample.remote())
推荐阅读
- apache-flink - Flink JobGraph 提交
- apache-spark - IndexError:导入 findspark.init 时列表索引超出范围
- angular - 你能教 VSC linter 你自己的语法吗?
- python - 当我的计算机上有文件时,我没有得到这样的文件或目录
- scala - 使用 scala spark 加入两个数据框
- mongodb - 在 spring-data mongodb 中构建动态查询的 Criteria API
- continuous-integration - 使用 Github Actions 报告条件作业的状态
- for-loop - 将 NxN for 循环修改为更简单
- azure - 发生 Azure Function App (python) 部署时生成通知
- powerbi - 每月数据在绘制时切换为每日数据