首页 > 解决方案 > Python Multiprocessing:如何从具有列表下一个元素的一组进程中再次运行一个进程?

问题描述

我有一个包含表名的列表,假设列表的大小为 n。现在我有 m 个服务器,所以我打开了 m 个与每个也在另一个列表中的游标对应的游标。现在对于每个表,我想调用一个将参数作为这两个列表的函数。

templst = [T1,T2,T3,T4,T5,T6, T7,T8,T9,T10,T11]
curlst = [cur1,cur2,cur3,cur4,cur5]

这些游标以 cur = conn.cursor() 打开,所以这些是对象

def extract_single(tableName, cursorconn):
      qry2 = "Select * FROM %s"% (tableName)
      cursorconn.execute(qry2).fetchall()
      print " extraction done"
      return 

现在我已经打开了 5 个进程(因为我有 5 个游标)以便并行运行它们。

processes = []

x = 0
for x in range(5):
   new_p = 'p%x'%x
   print "process :", new_p
   new_p = multiprocessing.Process(target=extract_single, args=(templst[x],cur[x]))
   new_p.start()
   processes.append(new_p)


for process in processes:
    process.join()

所以这确保我为每个游标打开了 5 个进程,并且它使用了前 5 个表名。现在我希望 5 个进程中的任何进程完成后,它应该立即从我的 templst 中取出第 6 个表,并且同样的事情继续进行,直到所有 templst 完成。

如何为此行为修改此代码?例如,我想做什么简单的例子。让我们将 templst 视为我要为其调用 sleep 函数的 int

templst = [1,2,5,7,4,3,6,8,9,10,11]
curlst = [cur1,cur2,cur3,cur4,cur5]

def extract_single(sec, cursorconn):
      print "Sleeping for second=%s done by cursor=%s"% (sec,cursorconn)
      time.sleep(sec)
      print " sleeping done"
      return

因此,当我启动 5 个游标时,sleep(1) 或 sleep(2) 可能首先完成,因此一旦完成,我想用该游标运行 sleep(3)。

我的真正查询将取决于游标,因为它将是 SQL 查询

修改的方法 考虑前面的睡眠示例。我现在想实现我假设有 10 个游标,并且我的睡眠队列按升序或降序排序。考虑按升序排列的列表 现在,在 10 个游标中,前 5 个游标将从队列中获取前 5 个元素,而我的另一组 5 个游标将获取最后 5 个。所以基本上我的游标队列分为两半,取最低值,另一半取最高值。现在,如果前半部分的光标完成,它应该采用下一个可用的最小值,如果光标来自另一个后半部分,那么它应该采用第 (n-6) 个值,即从末尾开始的 6 个值。

我需要从两侧遍历队列并有两组游标,每组 5

example: curlst1 = [cur1,cur2,cur3,cur4,cur5]
         curlst2 = [cur6,cur7,cur8,cur9,cur10 ]
        templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]

so cur1 -> 1
   cur2 ->2
... cur5 -> 5
cur6 -> 16
cur7 ->15
.... cur10->12

现在 cur1 首先完成,所以它需要 6(前面的第一个可用元素) cur2 finsihes 它需要 7 等等,如果 cur 10 finsihes 它将需要 11(后面的下一个可用元素)

依此类推,直到 templst 的所有元素。

标签: pythonmultiprocessingpython-multiprocessingpyodbc

解决方案


将您的templst参数(无论是真实示例中的表名还是如下示例中的休眠秒数)放在多处理队列中。然后每个进程循环读取队列中的下一项。当队列为空时,没有更多工作要执行,您的进程可以返回。您实际上已经实现了自己的进程池,其中每个进程都有自己的专用游标连接。现在,您的函数extract_single将要从中检索表名或秒参数的队列作为其第一个参数。

import multiprocessing
import Queue
import time

def extract_single(q, cursorconn):
    while True:
        try:
            sec = q.get_nowait()
            print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
            time.sleep(sec)
            print " sleeping done"
        except Queue.Empty:
            return

def main():
    q = multiprocessing.Queue()
    templst = [1,2,5,7,4,3,6,8,9,10,11]
    for item in templst:
        q.put(item) # add items to queue
    curlst = [cur1,cur2,cur3,cur4,cur5]
    process = []
    for i in xrange(5):
        p = multiprocessing.Process(target=extract_single, args=(q, curlst[i]))
        process.append(p)
        p.start()
    for p in process:
        p.join()

if __name__ == '__main__':
    main()

笔记

如果您的处理器少于 5 个,您可以尝试使用 5 个(或更多)线程运行它,在这种情况下Queue应该使用常规对象。

更新问题的更新答案

允许您从队列的前端和后端删除项目的数据结构称为双端队列(双端队列)。不幸的是,没有支持多处理的双端队列版本。但我认为您的表处理可能与线程一样好用,而且您的计算机不太可能有 10 个处理器来支持 10 个并发进程运行。

import threading
from collections import deque
import time
import sys

templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
q = deque(templst)
curlst1 = [cur1,cur2,cur3,cur4,cur5]
curlst2 = [cur6,cur7,cur8,cur9,cur10]

def extract_single(cursorconn, from_front):
    while True:
        try:
            sec = q.popleft() if from_front else q.pop()
            #print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
            sys.stdout.write("Sleeping for second=%s done by cursor=%s\n" % (sec,cursorconn))
            sys.stdout.flush() # flush output
            time.sleep(sec)
            #print " sleeping done"
            sys.stdout.write("sleeping done by %s\n" % cursorconn)
            sys.stdout.flush() # flush output
        except IndexError:
            return

def main():
    threads = []
    for cur in curlst1:
        t = threading.Thread(target=extract_single, args=(cur, True))
        threads.append(t)
        t.start()
    for cur in curlst2:
        t = threading.Thread(target=extract_single, args=(cur, False))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

if __name__ == '__main__':
    main()

推荐阅读