首页 > 解决方案 > 多进程:持久池?

问题描述

我有如下代码:

def expensive(self,c,v):
    .....

def inner_loop(self,c,collector):
    self.db.query('SELECT ...',(c,))
    for v in self.db.cursor.fetchall() :
        collector.append( self.expensive(c,v) ) 

def method(self):

    # create a Pool
    #join the Pool ??

    self.db.query('SELECT ...')
    for c in self.db.cursor.fetchall() :
        collector = []

        #RUN the whole cycle in parallel in separate processes
        self.inner_loop(c, collector)

        #do stuff with the collector

    #! close the pool ?

外循环和内循环都是数千个步骤......我想我知道如何运行一个包含几个进程的池。我发现的所有例子都或多或少地表明了这一点。

但在我的情况下,我需要午餐一个持久池,然后提供数据(c 值)。一旦内循环过程完成,我必须提供下一个可用的 c 值。并保持流程运行并收集结果。

我怎么做 ?


我有一个笨拙的想法是:

def method(self):
 ws = 4
 with Pool(processes=ws) as pool :
     cs = []
     for i,c in enumerate(..) :
       cs.append(c)  
       if i % ws == 0 :
         res = [pool.apply(self.inner_loop, (c)) for i in range(ws)]
         cs = []
         collector.append(res)

这会保持同一个游泳池运行吗?即不是每次都午餐新流程?我


我是否需要 'if i % ws == 0' 部分,或者我可以使用 imap()、map_async() 和 Pool obj 将在可用工人用尽时阻塞循环并在一些被释放时继续?

标签: python-3.xmultiprocessingpoolpersistent

解决方案


是的,multiprocessing.Pool工作方式是:

池中的工作进程通常在池的工作队列的整个持续时间内存在。

因此,只需通过以下方式将所有工作提交到池中imap就足够了:

with Pool(processes=4) as pool:
    initial_results = db.fetchall("SELECT c FROM outer")
    results = [pool.imap(self.inner_loop, (c,)) for c in initial_results]

也就是说,如果您真的这样做是为了从数据库中获取内容,那么将更多处理向下移动到该层可能更有意义(将计算带到数据中,而不是将数据带到计算中)。


推荐阅读