首页 > 解决方案 > 如何使用光线多处理有效地填充列表?

问题描述

我正在做一个项目,我需要收集部署以填充数据集,然后在其上训练模型。我想并行收集这些部署以加快进程,因此我尝试使用 ray 库提供的多处理:

import time
import ray

ray.init()

@ray.remote
class MainActor:
    def __init__(self):
        self.data_set = []

    def process_item(self, item):
        # process incoming item
        item+=1
        return item

    def append(self, val):
        # add item to data set after processing it
        item = self.process_item(val)
        self.data_set.append(item)

    def train(self):
        # train on datatset
        return len(self.data_set)

main_actor = MainActor.remote()

@ray.remote
def rollout_collector(main_actor):
    t = time.time()
    for i in range(40000):
        main_actor.append.remote(i)
    print("time per rollout : ",time.time() - t)


t = time.time()
ray.get([rollout_collector.remote(main_actor) for i in range(3)])
ray.get(main_actor.train.remote())
print("total time with multi-processing: ",time.time() - t)

# ============================== Single process ================================

class MainActor:
    def __init__(self):
        self.data_set = []

    def process_item(self, item):
        # process incoming item
        item+=1
        return item

    def append(self, val):
        # add item to data set after processing it
        item = self.process_item(val)
        self.data_set.append(item)

    def train(self):
        # train on datatset
        pass

main_actor = MainActor()
t = time.time()
for i in range(120000):
    main_actor.append(i)
main_actor.train()
print("total time with single process : ",time.time() - t)

rollout_collector 收集项目,然后在处理后将它们存储在 MainActor 中,以最终对其进行训练。但是,这种方法非常慢:

在此处输入图像描述

使用光线时的每次推出需要 20 秒,而没有光线和多处理则需要 0.12 秒。我有一个 cpu 用于主要演员,3 个用于 rollout_collectors。我认为我不能将部署存储在 rollout_collector 工作人员中,然后将所有内容发送给主要参与者,因为我正在使用水库采样进行培训。此外,在使用 ray 时,一个非常简单的函数的执行时间非常长:MainActor 上的训练函数只是返回数据集的长度,但仍需要 20 秒才能执行。

所以我的问题是我做错了什么?实现我的目标的最佳方式是什么?

标签: pythonmultiprocessingreinforcement-learningray

解决方案


跨进程 RPC ( actor.append.remote(i)) 的成本肯定会高于修改进程内数据结构 ( list.append(i))。前者涉及序列化/反序列化、通过系统调用的进程间通信和排队,因为您是从 3 个并发工作进程调用它。花了这么长时间的原因actor.train.remote()可能也是由于 120K 排队的远程调用。

如果实际上您的推出过程需要更多时间,那么多处理肯定会加快速度。我在最后为您和单进程循环添加了time.sleep(0.01)延迟,将推出次数减少到 400,然后多进程版本需要 4.5 秒才能完成,而单进程版本需要 13 秒。rollout_collector

如果您的个人部署非常快,请考虑在工作进程中本地对结果进行批处理,然后再将它们发送给主要参与者。


推荐阅读