python - Python Multiprocessing:如何从具有列表下一个元素的一组进程中再次运行一个进程?
问题描述
我有一个包含表名的列表,假设列表的大小为 n。现在我有 m 个服务器,所以我打开了 m 个与每个也在另一个列表中的游标对应的游标。现在对于每个表,我想调用一个将参数作为这两个列表的函数。
templst = [T1,T2,T3,T4,T5,T6, T7,T8,T9,T10,T11]
curlst = [cur1,cur2,cur3,cur4,cur5]
这些游标以 cur = conn.cursor() 打开,所以这些是对象
def extract_single(tableName, cursorconn):
qry2 = "Select * FROM %s"% (tableName)
cursorconn.execute(qry2).fetchall()
print " extraction done"
return
现在我已经打开了 5 个进程(因为我有 5 个游标)以便并行运行它们。
processes = []
x = 0
for x in range(5):
new_p = 'p%x'%x
print "process :", new_p
new_p = multiprocessing.Process(target=extract_single, args=(templst[x],cur[x]))
new_p.start()
processes.append(new_p)
for process in processes:
process.join()
所以这确保我为每个游标打开了 5 个进程,并且它使用了前 5 个表名。现在我希望 5 个进程中的任何进程完成后,它应该立即从我的 templst 中取出第 6 个表,并且同样的事情继续进行,直到所有 templst 完成。
如何为此行为修改此代码?例如,我想做什么简单的例子。让我们将 templst 视为我要为其调用 sleep 函数的 int
templst = [1,2,5,7,4,3,6,8,9,10,11]
curlst = [cur1,cur2,cur3,cur4,cur5]
def extract_single(sec, cursorconn):
print "Sleeping for second=%s done by cursor=%s"% (sec,cursorconn)
time.sleep(sec)
print " sleeping done"
return
因此,当我启动 5 个游标时,sleep(1) 或 sleep(2) 可能首先完成,因此一旦完成,我想用该游标运行 sleep(3)。
我的真正查询将取决于游标,因为它将是 SQL 查询
修改的方法 考虑前面的睡眠示例。我现在想实现我假设有 10 个游标,并且我的睡眠队列按升序或降序排序。考虑按升序排列的列表 现在,在 10 个游标中,前 5 个游标将从队列中获取前 5 个元素,而我的另一组 5 个游标将获取最后 5 个。所以基本上我的游标队列分为两半,取最低值,另一半取最高值。现在,如果前半部分的光标完成,它应该采用下一个可用的最小值,如果光标来自另一个后半部分,那么它应该采用第 (n-6) 个值,即从末尾开始的 6 个值。
我需要从两侧遍历队列并有两组游标,每组 5
example: curlst1 = [cur1,cur2,cur3,cur4,cur5]
curlst2 = [cur6,cur7,cur8,cur9,cur10 ]
templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
so cur1 -> 1
cur2 ->2
... cur5 -> 5
cur6 -> 16
cur7 ->15
.... cur10->12
现在 cur1 首先完成,所以它需要 6(前面的第一个可用元素) cur2 finsihes 它需要 7 等等,如果 cur 10 finsihes 它将需要 11(后面的下一个可用元素)
依此类推,直到 templst 的所有元素。
解决方案
将您的templst
参数(无论是真实示例中的表名还是如下示例中的休眠秒数)放在多处理队列中。然后每个进程循环读取队列中的下一项。当队列为空时,没有更多工作要执行,您的进程可以返回。您实际上已经实现了自己的进程池,其中每个进程都有自己的专用游标连接。现在,您的函数extract_single
将要从中检索表名或秒参数的队列作为其第一个参数。
import multiprocessing
import Queue
import time
def extract_single(q, cursorconn):
while True:
try:
sec = q.get_nowait()
print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
time.sleep(sec)
print " sleeping done"
except Queue.Empty:
return
def main():
q = multiprocessing.Queue()
templst = [1,2,5,7,4,3,6,8,9,10,11]
for item in templst:
q.put(item) # add items to queue
curlst = [cur1,cur2,cur3,cur4,cur5]
process = []
for i in xrange(5):
p = multiprocessing.Process(target=extract_single, args=(q, curlst[i]))
process.append(p)
p.start()
for p in process:
p.join()
if __name__ == '__main__':
main()
笔记
如果您的处理器少于 5 个,您可以尝试使用 5 个(或更多)线程运行它,在这种情况下Queue
应该使用常规对象。
更新问题的更新答案
允许您从队列的前端和后端删除项目的数据结构称为双端队列(双端队列)。不幸的是,没有支持多处理的双端队列版本。但我认为您的表处理可能与线程一样好用,而且您的计算机不太可能有 10 个处理器来支持 10 个并发进程运行。
import threading
from collections import deque
import time
import sys
templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
q = deque(templst)
curlst1 = [cur1,cur2,cur3,cur4,cur5]
curlst2 = [cur6,cur7,cur8,cur9,cur10]
def extract_single(cursorconn, from_front):
while True:
try:
sec = q.popleft() if from_front else q.pop()
#print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
sys.stdout.write("Sleeping for second=%s done by cursor=%s\n" % (sec,cursorconn))
sys.stdout.flush() # flush output
time.sleep(sec)
#print " sleeping done"
sys.stdout.write("sleeping done by %s\n" % cursorconn)
sys.stdout.flush() # flush output
except IndexError:
return
def main():
threads = []
for cur in curlst1:
t = threading.Thread(target=extract_single, args=(cur, True))
threads.append(t)
t.start()
for cur in curlst2:
t = threading.Thread(target=extract_single, args=(cur, False))
threads.append(t)
t.start()
for t in threads:
t.join()
if __name__ == '__main__':
main()
推荐阅读
- macos - Powerpoint 的官方文档 AppleScript 命令在哪里?
- omnet++ - INET wlan[0].radio 中的无线 NeSTiNg 主机:“接收开始:未尝试”
- amazon-web-services - 为不同用户使用不同权限保护 AWS S3 中文件的最佳方法是什么?
- python - 每 11 秒填充一次时间戳间隔
- java - 无法从 Map.Entry 转换
到字符串 - javascript - 将 js 承诺链转换为同步调用?
- uwp - Gaze Interaction 是否适用于使用其内置眼动仪的 HoloLens 2 上的 UWP 应用
- javascript - 如何使用 jQuery 定位 SVG 图像内的锚元素
- c - 尝试使用 strtok 从文件中分离字符串并存储在结构数组中
- html - 保持列之间的间隙:Bootstrap 3.3.7