python - 如何让演员同时做两件事?
问题描述
我定义 alearner
和 a worker
。我希望它在后台learner
运行其成员函数,并偶尔发送一些信息进行打印。learn
worker
learner
下面的代码是一个例子
import ray
@ray.remote
class Learner():
def __init__(self):
pass
def learn(self):
while True:
pass # do something, such as updating network
def log_score(self, score):
print('worker', score)
@ray.remote
class Worker():
def __init__(self, learner):
self.learner = learner
def sample(self):
for i in range(1000000):
if i % 1000 == 0:
self.learner.log_score.remote(i)
ray.init()
learner = Learner.remote()
worker = Worker.remote(learner)
worker.sample.remote()
learner.learn.remote()
while True:
pass
但是,直到完成learner
才会运行,这不是我想要的。我已经想到了一种使它起作用的方法:我没有显式调用,而是调用它。具体来说,我重新定义并如下log_score
learn
Learner.learn
Worker
learn
sample
"""Learner"""
def learn(self):
# no loop here
pass # do something, such as updating network
"""Worker"""
def sample(self):
for i in range(1000000):
if i % 1000 == 0:
self.learner.learn.remote()
self.learner.log_score.remote(i)
虽然这可行,但现在我必须控制应该多久learn
调用一次,这似乎有点多余。有没有更好的方法来实现我想要的?
解决方案
这是一个很好的问题。在 Ray 的 Actor 模型中,每个 Actor 任务都是原子的,因为 Actor 将一次执行任务,并且在前一个任务返回之前不会开始新的任务。这种选择简化了关于并发性的推理,但让参与者同时做两件事变得更加困难。
要完成这样的工作,您基本上有两个选择。
线程:让actor在后台线程中做一些工作,让actor的主线程空闲,以便它可以执行新任务。
import ray import threading import time @ray.remote class Actor(object): def __init__(self): self.value = 0 self.t = threading.Thread(target=self.update, args=()) self.t.start() def update(self): while True: time.sleep(0.01) self.value += 1 def get_value(self): return self.value ray.init() # Create the actor. This will start a long-running thread in the background # that updates the value. a = Actor.remote() # Get the value a couple times. print(ray.get(a.get_value.remote())) print(ray.get(a.get_value.remote()))
更小的工作单元:这意味着重新构建代码,以便没有任何actor方法永远循环。
learn
在您的示例中,您可以在经过一定次数的循环后使函数返回。learn
在这种情况下,必须不断提交新任务。甚至可以让learn
方法 submit 返回并提交自己,以便在两者之间安排其他方法。有很多方法可以做到这一点,这取决于您的应用程序,但下面是一个示例。import ray import threading import time @ray.remote class Actor(object): def __init__(self): self.value = 0 def set_handle_to_self(self, handle_to_self): self.handle_to_self = handle_to_self def learn(self): for _ in range(10): time.sleep(0.01) self.value += 1 # Submit the learn task again so that the learning continues # but other methods can be scheduled in between. self.handle_to_self.learn.remote() def get_value(self): return self.value ray.init() # Create the actor. This will start a long-running thread in the background # that updates the value. a = Actor.remote() # Give the actor a handle to itself so that it can submit tasks to itself. a.set_handle_to_self.remote(a) # Start the learning, which will continue forever. a.learn.remote() # Get the value a couple times. print(ray.get(a.get_value.remote())) print(ray.get(a.get_value.remote()))
推荐阅读
- airflow - 我杀死了 Airflow 任务,但它的 BashOperator 创建的子进程尚未被杀死
- swift - 如何从明天开始每天重复本地通知
- haskell - 有没有办法访问 Wai Web App 的 cookie 标头?
- oracle - 如何连接到我 PC 上的默认 Oracle10g 实例?
- angular - 输入上的 Angular 6 PipeTransform 在 IE11 上向左跳克拉
- dart - 颤振/飞镖在桌面上作为原生应用程序工作吗?
- laravel - 如何在laravel中传递月份值并获取月份的开始和结束日期
- matlab - 如何在matlab中填充图像之间的空白空间以进行体积成像/ 3D重建
- mysql - 通过 Ruby 问题将 PC 数据记录到数据库中
- java - 如何启用功能键在 Android Studios 模拟器中工作?