首页 > 解决方案 > multiprocessing.Pool.map 卡在最后一个 Process

问题描述

我在multiprocessing.Pool().map用于提高性能的 linux 服务器上运行 pycharm 中的程序。

代码看起来像这样:

import multiprocessing
from functools import partial

for episode in episodes:
    with multiprocessing.Pool() as mpool:
        func_part = partial(worker_function)
        mpool.map(func_part, range(step))

奇怪的是,它在我的 Windows 10 笔记本电脑上运行得非常好,但是一旦我尝试在 linux 服务器上运行它,程序就会卡在最后一个 Process 上measurement count 241/242,所以在进行下一次循环迭代之前,例如下一集。

在此处输入图像描述

没有给出错误信息。我在两台机器上都运行 pycharm。该Step层是我放置multiprocessing.Pool().map函数的地方。

编辑:

我已经添加了mpool.close()mpool.join()但它似乎没有效果:

import multiprocessing
from functools import partial

for episode in episodes:
    with multiprocessing.Pool() as mpool:
        func_part = partial(worker_function)
        mpool.map(func_part, range(step))
        mpool.close()
        mpool.join()

它仍然卡在最后一个过程中。

编辑2:

这是工作者函数:

def worker_func(steplength, episode, episodes, env, agent, state, log_data_qvalues, log_data, steps):
    env.time_ = step
    action = agent.act(state, env)                                                                               # given the state, the agent acts (eps-greedy) either by choosing randomly or relying on its own prediction (weights are considered here to sum up the q-values of all objectives)
    next_state, reward = env.steplength(action, state)                                                                # given the action, the environment gives back the next_state and the reward for the transaction for all objectives seperately
    agent.remember(state, action, reward, next_state, env.future_reward)                                        # agent puts the experience in his memory
    q_values = agent.model.predict(np.reshape(state, [1, env.state_size]))                                      # This part is not necessary for the framework, but lets the agent predict every time_ to
    start = 0                                                                                                   # to store the development of the prediction and to investigate the development of the Q-values
    machine_start = 0
    for counter, machine in enumerate(env.list_of_machines):
        liste = [episode, steplength, state[counter]]
        q_values_objectives = []
        for objective in range(1, env.number_of_objectives + 1):
            liste.append(objective)
            liste.append(q_values[0][start:machine.actions + start])
            start = int(agent.action_size / env.number_of_objectives) + start
        log_data_qvalues.append(liste)
        machine_start += machine.actions
        start = machine_start
    state = next_state
    steps.append(state)
    env.current_step += 1
    if len(agent.memory) > agent.batch_size:                                                                    # If the agent has collected more than batch_size-experience, the networks of the agents are starting
        agent.replay(env)                                                                                       # to be trained, with the replay function, batch-size- samples from the memory of the agents are selected
    agent.update_target_model()                                                                                 # the Q-target is updated after one batch-training
    if steplength == env.steplength-2:                                                                                # for plotting the process during training
        #agent.update_target_model()
        print(f'Episode: {episode + 1}/{episodes}    Score: {steplength}    e: {agent.epsilon:.5}')
        log_data.append([episode, agent.epsilon])

如您所见,它使用多个类来传递属性。我不知道我将如何重现它。我仍在试验该过程究竟在哪里卡住。worker 函数与类和类进行通信,envagent传递训练神经网络所需的信息。agent班级控制学习过程,而班级env模拟网络控制的环境。

step是一个整数变量:

step = 12

标签: pythonparallel-processingmultiprocessing

解决方案


你在打电话吗

mpool.close()
mpool.join()

在最后?


编辑

问题不在于 w/ multiprocessing,而在于measurement count部件。根据屏幕截图,池映射成功结束于第 11 步(range(12)从 0 开始)。measurement count在提供的片段中无处可见以尝试调试该部分。


推荐阅读