首页 > 解决方案 > 如何让演员同时做两件事?

问题描述

我定义 alearner和 a worker。我希望它在后台learner运行其成员函数,并偶尔发送一些信息进行打印。learnworkerlearner

下面的代码是一个例子

import ray

@ray.remote
class Learner():
    def __init__(self):
        pass

    def learn(self):
        while True:
            pass # do something, such as updating network 

    def log_score(self, score):
        print('worker', score)

@ray.remote
class Worker():
    def __init__(self, learner):
        self.learner = learner

    def sample(self):
        for i in range(1000000):
            if i % 1000 == 0:
                self.learner.log_score.remote(i)

ray.init()

learner = Learner.remote()
worker = Worker.remote(learner)


worker.sample.remote()
learner.learn.remote()

while True:
    pass

但是,直到完成learner才会运行,这不是我想要的。我已经想到了一种使它起作用的方法:我没有显式调用,而是调用它。具体来说,我重新定义并如下log_scorelearnLearner.learnWorkerlearnsample

"""Learner"""
def learn(self):
    # no loop here
    pass # do something, such as updating network 

"""Worker"""
def sample(self):
    for i in range(1000000):
        if i % 1000 == 0:
            self.learner.learn.remote()
            self.learner.log_score.remote(i)

虽然这可行,但现在我必须控制应该多久learn调用一次,这似乎有点多余。有没有更好的方法来实现我想要的?

标签: pythonparallel-processingray

解决方案


这是一个很好的问题。在 Ray 的 Actor 模型中,每个 Actor 任务都是原子的,因为 Actor 将一次执行任务,并且在前一个任务返回之前不会开始新的任务。这种选择简化了关于并发性的推理,但让参与者同时做两件事变得更加困难。

要完成这样的工作,您基本上有两个选择。

  1. 线程:让actor在后台线程中做一些工作,让actor的主线程空闲,以便它可以执行新任务。

    import ray
    import threading
    import time
    
    @ray.remote
    class Actor(object):
        def __init__(self):
            self.value = 0
            self.t = threading.Thread(target=self.update, args=())
            self.t.start()
    
        def update(self):
            while True:
                time.sleep(0.01)
                self.value += 1
    
        def get_value(self):
            return self.value
    
    ray.init()
    
    # Create the actor. This will start a long-running thread in the background
    # that updates the value.
    a = Actor.remote()
    
    # Get the value a couple times.
    print(ray.get(a.get_value.remote()))
    print(ray.get(a.get_value.remote()))
    
  2. 更小的工作单元:这意味着重新构建代码,以便没有任何actor方法永远循环。learn在您的示例中,您可以在经过一定次数的循环后使函数返回。learn在这种情况下,必须不断提交新任务。甚至可以让learn方法 submit 返回并提交自己,以便在两者之间安排其他方法。有很多方法可以做到这一点,这取决于您的应用程序,但下面是一个示例。

    import ray
    import threading
    import time
    
    @ray.remote
    class Actor(object):
        def __init__(self):
            self.value = 0
    
        def set_handle_to_self(self, handle_to_self):
            self.handle_to_self = handle_to_self
    
        def learn(self):
            for _ in range(10):
                time.sleep(0.01)
                self.value += 1
    
            # Submit the learn task again so that the learning continues
            # but other methods can be scheduled in between.
            self.handle_to_self.learn.remote()
    
        def get_value(self):
            return self.value
    
    ray.init()
    
    # Create the actor. This will start a long-running thread in the background
    # that updates the value.
    a = Actor.remote()
    # Give the actor a handle to itself so that it can submit tasks to itself.
    a.set_handle_to_self.remote(a)
    
    # Start the learning, which will continue forever.
    a.learn.remote()
    
    # Get the value a couple times.
    print(ray.get(a.get_value.remote()))
    print(ray.get(a.get_value.remote()))
    

推荐阅读