首页 > 解决方案 > 并行化使用多处理运行进程顺序而不是同时

问题描述

我正在尝试使用多处理模块并行化下面给出的一段代码。我尝试的一切都会导致每个子进程一个接一个地运行,即使它们都有不同的 PID。我努力了:

  1. CentOS 和 MacOS
  2. 作为 spawn 和 fork 的上下文
  3. 使用队列和使用池
  4. 使用 Apply 和 Using map 及其异步版本
  5. 添加/删除 pool.join() 和 Process.join()

我无法弄清楚我做错了什么。

fs.py:

import numpy as np
from time import sleep
import os

def f(r):
    res = np.arange(r[0], r[1])
    print(f'I am {os.getpid()}')
    sleep(10)
    print(f'I am {os.getpid()} and I am finished')
    return {'nums': res, 'dubs': res * 2}

游乐场.py:

import multiprocessing as mp
import numpy as np
from fs import f


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    p = ctx.Pool(4)
    with p:
        subsets = [[0, 3], [3, 6], [6, 7]]
        res = [p.apply(f, (subset, )) for subset in subsets]
        print(res)

    print('Done!')

命令:python playground.py

输出:

I am 29881
I am 29881 and I am finished
I am 29882
I am 29882 and I am finished
I am 29881
I am 29881 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])}, {'nums': array([3, 4, 5]), 
  'dubs': array([ 6,  8, 10])}, {'nums': array([6]), 'dubs': array([12])}]
Done!

标签: pythonparallel-processingpython-multiprocessing

解决方案


当我p.map()这样使用时(在 Linux Mint 上)

res = p.map(f, subsets)

然后我得到

I am 1337328
I am 1337325
I am 1337327
I am 1337328 and I am finished
I am 1337325 and I am finished
I am 1337327 and I am finished

也许你用map()错了方式。res = [p.map(f, (subset, )) for subset in subsets]


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    with ctx.Pool(4) as p:
        subsets = [[0, 3], [3, 6], [6, 7]]
        res = p.map(f, subsets)
        print(res)
        
    print('Done!')

因为apply_async你需要两个for循环

    items = [p.apply_async(f, (subset, )) for subset in subsets]
    res = [x.get() for x in items]
    print(res)

两者都必须在里面with p:


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    with ctx.Pool(4) as p:
        subsets = [[0, 3], [3, 6], [6, 7]]

        items = [p.apply_async(f, (subset, )) for subset in subsets]
        print(items)
        
        res = [x.get() for x in items]
        print(res)
        
    print('Done!')

推荐阅读