首页 > 解决方案 > 如何使用多处理加速嵌套循环?

问题描述

我正在尝试使用内部 cpu 绑定任务加速嵌套循环

def main():
    for i in range(10):
       cpu_bound_task()
       for j in range(10):
          cpu_bound_task()
          for k in range(100):
             cpu_bound_task()

为简单起见,cpu-bound 任务定义为:

def cpu_bound_task():
    _ = [0]*1000000

基于从其他帖子(thisthis)的简单推导,在最后一个循环上使用 pool.map() 就可以了(前提是循环足够大;否则启动池的开销似乎违背了创建的目的一个水池):

from multiprocessing import Pool
def main_pool():
    for i in range(10):
        cpu_bound_task()
        for j in range(10):
          cpu_bound_task()
          pool = Pool()
          pool.map(func=cpu_bound_task, iterable=()*100)
          pool.close()
          pool.join()

但是,我有两个额外的要求:

  1. 第一个循环也必须在 3 个并行进程中运行(我知道这不会提高速度)
  2. 循环的每次迭代必须等待该迭代中的所有过程完成,然后才能进行下一次迭代

我的方法(这可能不是最好的!)涉及创建一个 WaitGroup 类,其实例将与共享队列建立连接(将任务添加到该队列,发出任务完成信号,并等待组的进程完成)。然后,多个进程将运行一个 run_func() 函数,该函数将从该队列中获取任务并执行它们。

run_func() 函数定义为:

def run_func(q):
    while True:
        task = q.get()
        func, kwargs = task
        if func is None: break # signals end of all tasks
        func(**kwargs, q=q)

WaitGroup 类定义为:

class WaitGroup():
    
    def __init__(self, group_id, max_p, wg_shared_inputs):
        self.group_id = group_id
        self.max_p = max_p # maximum elements sent to the queue
        self.wait_count = wg_shared_inputs['values'][self.group_id]
        self.cv = wg_shared_inputs['conditions'][self.group_id]
    
    def add(self, func, kwargs, q):
        '''add task to the queue'''
        self.cv.acquire()
        if self.max_p:
            while self.wait_count.value >= self.max_p: # >= or >, check
                self.cv.wait() # releases lock automatically
        q.put((func,{**kwargs,'parent_wg':self}))
        self.wait_count.value += 1
        self.cv.release()
    
    def done(self):
        '''mark task as completed'''
        self.cv.acquire()
        if self.wait_count.value > 0:
            self.wait_count.value -= 1
        if self.wait_count.value == 0:
            self.cv.notify_all()
        self.cv.release()
    
    def wait(self):
        '''wait for a group of tasks to be completed'''
        self.cv.acquire()
        while self.wait_count.value > 0:
            self.cv.wait()
        self.cv.release()

wg_shared_inputs 是一个预先创建的简单字典,其中包含 manager.Value() 和 manager.Condition() 的空实例。(理想情况下,这些实例将在需要时由 WaitGroup 类创建,但不幸的是我似乎无法做到这一点,因为 WorkGroup 实例作为参数传递给 Processes。所以我必须预先确定需要多少个实例)

最后一步是将循环拆分为多个步骤,主函数定义为:

def main_q():
    
    # Handle Manager Variables
    from multiprocessing import Manager
    manager = Manager()
    q = manager.Queue()
    values = manager.dict({0:manager.Value('i',0), 1:manager.Value('i',0), 2:manager.Value('i',0)})
    conditions = manager.dict({0:manager.Condition(), 1:manager.Condition(), 2:manager.Condition()})
    wg_shared_inputs = {'values':values, 'conditions':conditions}
    
    # Launch Processes
    from multiprocessing import Process
    num = 10
    processes = [Process(target=run_func, args=(q,)) for _ in range(num)]
    for p in processes: p.start()
        
    # Create & Launch Wait Group
    wg = WaitGroup(group_id=0, max_p=3, wg_shared_inputs=wg_shared_inputs)
    for i in range(20): wg.add(step1, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
    wg.wait()

    # End Queue
    for _ in range(num): q.put((None,{})) # signals end of all tasks
        
    # Join Processes
    for p in processes: p.join()

并将后续步骤定义为:

def step1(i, wg_shared_inputs, q, parent_wg=None):
    cpu_bound_task()
    wg = WaitGroup(group_id=1, max_p=1, wg_shared_inputs=wg_shared_inputs)
    for j in range(10): wg.add(step2, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
    wg.wait()
    parent_wg.done()
    
def step2(i, wg_shared_inputs, q, parent_wg=None):
    cpu_bound_task()
    wg = WaitGroup(group_id=2, max_p=None, wg_shared_inputs=wg_shared_inputs)
    for k in range(100): wg.add(step3, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
    wg.wait()
    parent_wg.done()

def step3(i, wg_shared_inputs, q, parent_wg=None):
    cpu_bound_task()
    parent_wg.done()

运行我得到的 3 个不同的选项:

SIMPLE VERSION (main())
Completed in 84.85 seconds

POOL VERSION (main_pool())
Completed in 62.62 seconds

QUEUE VERSION (main_q())
Completed in 131.84 seconds

我对结果感到惊讶。任何想法为什么队列版本要慢得多?或者有什么想法可以用不同的方法来实现我的目标?

标签: pythonparallel-processingmultiprocessingnested-loops

解决方案


首先,您的代码main_pool并不完全正确:iterable=()*100相当于iterable=(). 仍在使用的修改版本map可能是:

def cpu_bound_task():
    _ = [0]*1000000

def cpu_bound_task_adapter(index):
    return cpu_bound_task()

def main_pool():
    for i in range(10):
        cpu_bound_task()
        for j in range(10):
            pool = Pool()
            pool.map(func=cpu_bound_task_adapter, iterable=range(100))
            pool.close()
            pool.join()

在我的桌面上,您的 SIMPLE VERSION (main()) 在 67.9 秒内完成,而您更正后的 POOL VERSION (main_pool()) 在 46.2 秒内完成(我有 8 个逻辑内核、4 个物理内核)。

但是没有理由在循环中反复创建和拆除池。相反,在一开始就创建一次池:

def main_pool():
    pool = Pool()
    for i in range(10):
        cpu_bound_task()
        for j in range(10):
            pool.map(func=cpu_bound_task_adapter, iterable=range(100))
    pool.close()

现在运行时间为 20.9 秒。

如果我修改您的代码以明确指定池大小为 3,则修改后的版本将在 28.4 秒内完成。


推荐阅读