python - 如何使用多处理加速嵌套循环?
问题描述
我正在尝试使用内部 cpu 绑定任务加速嵌套循环:
def main():
for i in range(10):
cpu_bound_task()
for j in range(10):
cpu_bound_task()
for k in range(100):
cpu_bound_task()
为简单起见,cpu-bound 任务定义为:
def cpu_bound_task():
_ = [0]*1000000
基于从其他帖子(this或this)的简单推导,在最后一个循环上使用 pool.map() 就可以了(前提是循环足够大;否则启动池的开销似乎违背了创建的目的一个水池):
from multiprocessing import Pool
def main_pool():
for i in range(10):
cpu_bound_task()
for j in range(10):
cpu_bound_task()
pool = Pool()
pool.map(func=cpu_bound_task, iterable=()*100)
pool.close()
pool.join()
但是,我有两个额外的要求:
- 第一个循环也必须在 3 个并行进程中运行(我知道这不会提高速度)
- 循环的每次迭代必须等待该迭代中的所有过程完成,然后才能进行下一次迭代
我的方法(这可能不是最好的!)涉及创建一个 WaitGroup 类,其实例将与共享队列建立连接(将任务添加到该队列,发出任务完成信号,并等待组的进程完成)。然后,多个进程将运行一个 run_func() 函数,该函数将从该队列中获取任务并执行它们。
run_func() 函数定义为:
def run_func(q):
while True:
task = q.get()
func, kwargs = task
if func is None: break # signals end of all tasks
func(**kwargs, q=q)
WaitGroup 类定义为:
class WaitGroup():
def __init__(self, group_id, max_p, wg_shared_inputs):
self.group_id = group_id
self.max_p = max_p # maximum elements sent to the queue
self.wait_count = wg_shared_inputs['values'][self.group_id]
self.cv = wg_shared_inputs['conditions'][self.group_id]
def add(self, func, kwargs, q):
'''add task to the queue'''
self.cv.acquire()
if self.max_p:
while self.wait_count.value >= self.max_p: # >= or >, check
self.cv.wait() # releases lock automatically
q.put((func,{**kwargs,'parent_wg':self}))
self.wait_count.value += 1
self.cv.release()
def done(self):
'''mark task as completed'''
self.cv.acquire()
if self.wait_count.value > 0:
self.wait_count.value -= 1
if self.wait_count.value == 0:
self.cv.notify_all()
self.cv.release()
def wait(self):
'''wait for a group of tasks to be completed'''
self.cv.acquire()
while self.wait_count.value > 0:
self.cv.wait()
self.cv.release()
wg_shared_inputs 是一个预先创建的简单字典,其中包含 manager.Value() 和 manager.Condition() 的空实例。(理想情况下,这些实例将在需要时由 WaitGroup 类创建,但不幸的是我似乎无法做到这一点,因为 WorkGroup 实例作为参数传递给 Processes。所以我必须预先确定需要多少个实例)
最后一步是将循环拆分为多个步骤,主函数定义为:
def main_q():
# Handle Manager Variables
from multiprocessing import Manager
manager = Manager()
q = manager.Queue()
values = manager.dict({0:manager.Value('i',0), 1:manager.Value('i',0), 2:manager.Value('i',0)})
conditions = manager.dict({0:manager.Condition(), 1:manager.Condition(), 2:manager.Condition()})
wg_shared_inputs = {'values':values, 'conditions':conditions}
# Launch Processes
from multiprocessing import Process
num = 10
processes = [Process(target=run_func, args=(q,)) for _ in range(num)]
for p in processes: p.start()
# Create & Launch Wait Group
wg = WaitGroup(group_id=0, max_p=3, wg_shared_inputs=wg_shared_inputs)
for i in range(20): wg.add(step1, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
wg.wait()
# End Queue
for _ in range(num): q.put((None,{})) # signals end of all tasks
# Join Processes
for p in processes: p.join()
并将后续步骤定义为:
def step1(i, wg_shared_inputs, q, parent_wg=None):
cpu_bound_task()
wg = WaitGroup(group_id=1, max_p=1, wg_shared_inputs=wg_shared_inputs)
for j in range(10): wg.add(step2, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
wg.wait()
parent_wg.done()
def step2(i, wg_shared_inputs, q, parent_wg=None):
cpu_bound_task()
wg = WaitGroup(group_id=2, max_p=None, wg_shared_inputs=wg_shared_inputs)
for k in range(100): wg.add(step3, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
wg.wait()
parent_wg.done()
def step3(i, wg_shared_inputs, q, parent_wg=None):
cpu_bound_task()
parent_wg.done()
运行我得到的 3 个不同的选项:
SIMPLE VERSION (main())
Completed in 84.85 seconds
POOL VERSION (main_pool())
Completed in 62.62 seconds
QUEUE VERSION (main_q())
Completed in 131.84 seconds
我对结果感到惊讶。任何想法为什么队列版本要慢得多?或者有什么想法可以用不同的方法来实现我的目标?
解决方案
首先,您的代码main_pool
并不完全正确:iterable=()*100
相当于iterable=()
. 仍在使用的修改版本map
可能是:
def cpu_bound_task():
_ = [0]*1000000
def cpu_bound_task_adapter(index):
return cpu_bound_task()
def main_pool():
for i in range(10):
cpu_bound_task()
for j in range(10):
pool = Pool()
pool.map(func=cpu_bound_task_adapter, iterable=range(100))
pool.close()
pool.join()
在我的桌面上,您的 SIMPLE VERSION (main()) 在 67.9 秒内完成,而您更正后的 POOL VERSION (main_pool()) 在 46.2 秒内完成(我有 8 个逻辑内核、4 个物理内核)。
但是没有理由在循环中反复创建和拆除池。相反,在一开始就创建一次池:
def main_pool():
pool = Pool()
for i in range(10):
cpu_bound_task()
for j in range(10):
pool.map(func=cpu_bound_task_adapter, iterable=range(100))
pool.close()
现在运行时间为 20.9 秒。
如果我修改您的代码以明确指定池大小为 3,则修改后的版本将在 28.4 秒内完成。
推荐阅读
- c# - 如何使动态创建的对象的事件处理程序引用另一个类中的 DGV 对象?
- python - 文本清理后在 python 中的 pandas 列中保存更改
- mysql - For 循环在第一次迭代后停止
- python - tf-nightly-gpu 在整个训练过程中的性能下降
- javascript - Vuex 状态中的 Javascript 类中缺少方法
- c++ - 使用另一个结构构建链接列表 - 附加函数运行时错误
- javascript - 如何渲染类组件?
- r - 如何采样实数?
- java - 矩形节点的 Graphviz (Java) 标签写在节点的末端
- reactjs - Webpack 5 DependOn 仅在条目仅依赖于一个依赖项时才有效