首页 > 解决方案 > 多处理建议和停止进程

问题描述

我正在尝试实现一个系统,其中:

为了实现这一点,我将生成的 Actors 数据附加到 amultiprocessing.Queue中,我使用一个进程将我的数据推送到我的 Replay。我用一个multiprocessing.BaseManager来分享Replay

这是我的实现(代码正在运行):

import time
import random
from collections import deque
import torch.multiprocessing as mp
from multiprocessing.managers import BaseManager

T = 20
B = 5
REPLAY_MINIMUM_SIZE = 10
REPLAY_MAXIMUM_SIZE = 100


class Actor:
    def __init__(self, global_buffer, rank):
        self.rank = rank
        self.local_buffer = []
        self.global_buffer = global_buffer

    def run(self, num_steps):
        for step in range(num_steps):
            data = f'{self.rank}_{step}'
            self.local_buffer.append(data)

            if len(self.local_buffer) >= B:
                self.global_buffer.put(self.local_buffer)
                self.local_buffer = []


class Learner:
    def __init__(self, replay):
        self.replay = replay

    def run(self, num_steps):
        while self.replay.size() <= REPLAY_MINIMUM_SIZE:
            time.sleep(0.1)
        for step in range(num_steps):
            batch = self.replay.sample(B)
            print(batch)

class Replay:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, experiences):
        self.memory.extend(experiences)

    def sample(self, n):
        return random.sample(self.memory, n)

    def size(self):
        return len(self.memory)


def send_data_to_replay(global_buffer, replay):
    while True:
        if not global_buffer.empty():
            batch = global_buffer.get()
            replay.push(batch)


if __name__ == '__main__':
    num_actors = 2

    global_buffer = mp.Queue()

    BaseManager.register("ReplayMemory", Replay)
    Manager = BaseManager()
    Manager.start()
    replay = Manager.ReplayMemory(REPLAY_MAXIMUM_SIZE)

    learner = Learner(replay)
    learner_process = mp.Process(target=learner.run, args=(T,))
    learner_process.start()

    actor_processes = []
    for rank in range(num_actors):
        p = mp.Process(target=Actor(global_buffer, rank).run, args=(T,))
        p.start()
        actor_processes.append(p)

    replay_process = mp.Process(target=send_data_to_replay, args=(global_buffer, replay,))
    replay_process.start()

    learner_process.join()
    [actor_process.join() for actor_process in actor_processes]
    replay_process.join()

我遵循了几个教程并阅读了与多处理相关的网站,但我对分布式计算非常陌生。我不确定我所做的是否正确。

我想知道我的代码中是否存在一些不当行为或没有遵循良好实践的东西。此外,当我启动程序时,不同的进程不会终止。而且我不确定为什么以及如何处理它。

对于任何反馈,我们都表示感谢!

标签: pythonmultiprocessing

解决方案


我发现在使用多处理时,最好为每个正在运行的进程设置一个队列。当您准备好关闭应用程序时,您可以向每个队列发送退出消息(或毒丸)并干净地关闭每个进程。

当您启动子进程时,通过继承将父队列和子队列传递给新进程。


推荐阅读