python - 多处理建议和停止进程
问题描述
我正在尝试实现一个系统,其中:
Actors
生成数据Replay
是一个管理由 生成的数据的类Actors
(理论上它比下面的代码做的要多得多,但我把它放在这里很简单)Learner
使用Replay类的数据(有时会更新一些数据Replay
)
为了实现这一点,我将生成的 Actors 数据附加到 amultiprocessing.Queue
中,我使用一个进程将我的数据推送到我的 Replay。我用一个multiprocessing.BaseManager
来分享Replay
。
这是我的实现(代码正在运行):
import time
import random
from collections import deque
import torch.multiprocessing as mp
from multiprocessing.managers import BaseManager
T = 20
B = 5
REPLAY_MINIMUM_SIZE = 10
REPLAY_MAXIMUM_SIZE = 100
class Actor:
def __init__(self, global_buffer, rank):
self.rank = rank
self.local_buffer = []
self.global_buffer = global_buffer
def run(self, num_steps):
for step in range(num_steps):
data = f'{self.rank}_{step}'
self.local_buffer.append(data)
if len(self.local_buffer) >= B:
self.global_buffer.put(self.local_buffer)
self.local_buffer = []
class Learner:
def __init__(self, replay):
self.replay = replay
def run(self, num_steps):
while self.replay.size() <= REPLAY_MINIMUM_SIZE:
time.sleep(0.1)
for step in range(num_steps):
batch = self.replay.sample(B)
print(batch)
class Replay:
def __init__(self, capacity):
self.memory = deque(maxlen=capacity)
def push(self, experiences):
self.memory.extend(experiences)
def sample(self, n):
return random.sample(self.memory, n)
def size(self):
return len(self.memory)
def send_data_to_replay(global_buffer, replay):
while True:
if not global_buffer.empty():
batch = global_buffer.get()
replay.push(batch)
if __name__ == '__main__':
num_actors = 2
global_buffer = mp.Queue()
BaseManager.register("ReplayMemory", Replay)
Manager = BaseManager()
Manager.start()
replay = Manager.ReplayMemory(REPLAY_MAXIMUM_SIZE)
learner = Learner(replay)
learner_process = mp.Process(target=learner.run, args=(T,))
learner_process.start()
actor_processes = []
for rank in range(num_actors):
p = mp.Process(target=Actor(global_buffer, rank).run, args=(T,))
p.start()
actor_processes.append(p)
replay_process = mp.Process(target=send_data_to_replay, args=(global_buffer, replay,))
replay_process.start()
learner_process.join()
[actor_process.join() for actor_process in actor_processes]
replay_process.join()
我遵循了几个教程并阅读了与多处理相关的网站,但我对分布式计算非常陌生。我不确定我所做的是否正确。
我想知道我的代码中是否存在一些不当行为或没有遵循良好实践的东西。此外,当我启动程序时,不同的进程不会终止。而且我不确定为什么以及如何处理它。
对于任何反馈,我们都表示感谢!
解决方案
我发现在使用多处理时,最好为每个正在运行的进程设置一个队列。当您准备好关闭应用程序时,您可以向每个队列发送退出消息(或毒丸)并干净地关闭每个进程。
当您启动子进程时,通过继承将父队列和子队列传递给新进程。
推荐阅读
- python - “模型”对象没有属性“损失函数”
- php - 每当值 x 比值 y 模 3 大 1 时进行测试
- docker - 无法将 Docker CLI 连接到本地 Docker 守护程序
- reactjs - react-popper 错位 popper 元素
- google-apps-script - Google 表格:是否有 RETRY(FORMULA(), amount_of_retries) 公式?
- java - 从 json 到列表的格式是什么?
- encryption - 如何在 C# 中从内存中删除加密密钥?
- python - 为什么 ubuntu 最终会使用不同版本的 python 和 pip 并且 django 被全局安装而不是在 virtualenv 中?
- python - 奇怪的 Windows 网络接口
- reactjs - 未找到模块。无法解析“资产/样式/常量”