首页 > 解决方案 > 用于 Actor 对象中的方法时,远程调用会阻塞吗?

问题描述

执行以下操作不会同时进行,而是先执行 Run1 并阻塞,直到完成,然后再执行 Run2。

@ray.remote
class Test:
    def __init__(self):
        pass

    def Run1(self):
        print('Run1 Start')
        sleep(5)
        print('Run1 End')

    def Run2(self):
        print('Run2')

ray.init()
test = Test.remote()
test.Run1.remote()
test.Run2.remote()

sleep(10)

输出:

(pid=8109) Run1 Start
(pid=8109) Run1 End
(pid=8109) Run2

这有点出乎意料。如何强制方法同时执行?

编辑以解决后续评论:

执行双线程方法似乎不起作用。下面的代码始终导致 PyArrow 的管道损坏。我想同时运行 self.PreloadSamples 方法和 self.Optimize 方法。BufferActor 类通过 @ray.remote 修饰的 GetSamples() 方法收集和提供批处理样本。由于 GPU 上的数据不可序列化,因此这需要在优化器对象端完成,并且我想确保这是并行完成的,而不是相对于优化按顺序完成的。

请参阅下面的问题的完全隔离版本,该版本在运行约 1 分钟后复制问题:

import torch
import ray
import threading
from time import sleep


def Threaded(fn):
    def wrapper(*args, **kwargs):
        thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
        thread.start()
        return thread
    return wrapper

@ray.remote
class BufferActor():
    def __init__(self):
        pass

    def GetSamples(self):
        return torch.randn(32, 100)


@ray.remote(num_gpus=1)
class OptimizerActor():
    def __init__(self, bufferActor):
        self.bufferActor = bufferActor
        self.samplesOnGPU = list()

        self.PreloadSamples()
        self.Optimize()

    @Threaded
    def PreloadSamples(self):
        #this retrieves a batch of samples (in numpy/torch format on CPU)
        if (len(self.samplesOnGPU) < 5):
            samples = ray.get(self.bufferActor.GetSamples.remote())

            self.samplesOnGPU.append(samples.to('cuda'))

            print('Samples Buffer: %s' % len(self.samplesOnGPU))
        else:
            sleep(0.01)

        self.PreloadSamples()

    @Threaded
    def Optimize(self):
        if (len(self.samplesOnGPU) > 0):
            samples = self.samplesOnGPU.pop(0)
            print('Optimizing')

            #next we perform loss calc + backprop + optimizer step (not shown)

        sleep(0.01)
        self.Optimize()



ray.init()

bufferActor = BufferActor.remote()
optimizerActor = OptimizerActor.remote(bufferActor)

sleep(60*60)

标签: ray

解决方案


Actor 将一次执行一种方法以避免并发问题。如果你想与actors并行(你通常会这样做),最好的方法是启动两个(或更多)actors并向它们都提交任务。


推荐阅读