首页 > 技术文章 > python-day32(进程池,管道,数据共享)

Thui 2018-11-29 17:05 原文

一. 管道

  进程间通信(IPC)  管道(不推荐使用),会导致数据不安全的情况出现,

  conn1,conn2 = Pipe 表示管道两端的连接对象,强调一点: 必须在产生Process对象之前产生管道

  主要方法:

    conn1.recv(): 接收从conn2.send(obj)对象,如果没有消息可接收,recv方法会一直阻塞,

      如果连接的另一端已经关闭,namerecv方法会抛出EOFError

    conn1.send(obj): 通过连接发送对象,obj是与序列化兼容的任意对象

  其他方法:

    conn1.close(): 关闭连接, 如果conn1被垃圾回收,将自动调用次方法

 1 from multiprocessing import Process, Pipe
 2 
 3 def f(conn):
 4     conn.send("Hello 妹妹") #子进程发送了消息
 5     conn.close()
 6 
 7 if __name__ == '__main__':
 8     parent_conn, child_conn = Pipe() #建立管道,拿到管道的两端,双工通信方式,两端都可以收发消息
 9     p = Process(target=f, args=(child_conn,)) #将管道的一段给子进程
10     p.start() #开启子进程
11     print(parent_conn.recv()) #主进程接受了消息
12     p.join()
管道的使用, 使用不当各种报错

二. 数据共享

  Manager:进程间数据是独立的,可以借助于队列或者管道实现通信,二者都是基于消息传递的

      虽然进程间数据独立,但可以通过Manager实现数据共享

  多进程共同去处理共享数据的时候,就和我们多进程同时去操作一个文件中的数据是一样的,

  不加锁就会出现错误的结果,进程不安全的,所以也需要加锁

 1 from multiprocessing import Manager,Process,Lock
 2 def work(d,lock):
 3     with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
 4         d['count']-=1
 5 
 6 if __name__ == '__main__':
 7     lock=Lock()
 8     with Manager() as m:
 9         dic=m.dict({'count':100})
10         p_l=[]
11         for i in range(100):
12             p=Process(target=work,args=(dic,lock))
13             p_l.append(p)
14             p.start()
15         for p in p_l:
16             p.join()
17         print(dic)
Manager 模块使用

 (进程间的通信: 管道,队列,数据共享)

三. 进程池 (重点)

  创建进程池的类: 如果指定numprocess参数为3,则进程池会从无到有创建三个进程,然后

  自始至终使用这三个进程去执行所有任务,不会开启其他进程,提高操作系统效率,减少空间的占用

  创建进程池 p = Pool(4) #四个进程 如果省略,将默认使用 cpu_count() 的值

  p.apply()  同步提交任务,直接可以收到返回值

 1 import os,time
 2 from multiprocessing import Pool
 3 
 4 def work(n):
 5     print('%s run' %os.getpid())
 6     time.sleep(1)
 7     return n**2
 8 
 9 if __name__ == '__main__':
10     p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
11     res_l=[]
12     for i in range(10):
13         res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
14                                     # 但不管该任务是否存在阻塞,同步调用都会在原地等着
15         res_l.append(res)
16     print(res_l)
同步调用

  p.apply_async() 异步提交任务: res.get() 阻塞效果

 1 import os
 2 import time
 3 import random
 4 from multiprocessing import Pool
 5 
 6 def work(n):
 7     print('%s run' %os.getpid())
 8     time.sleep(random.random())
 9     return n**2
10 
11 if __name__ == '__main__':
12     p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
13     res_l=[]
14     for i in range(10):
15         res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行,并且可以执行不同的任务,传送任意的参数了。
16                                           # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
17                                           # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
18                                           # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。  
19         res_l.append(res)
20 
21     # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用join,等待进程池内任务都处理完,然后可以用get收集结果
22     # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
23     p.close() #不是关闭进程池,而是结束进程池接收任务,确保没有新任务再提交过来。
24     p.join()   #感知进程池中的任务已经执行结束,只有当没有新的任务添加进来的时候,才能感知到任务结束了,所以在join之前必须加上close方法
25     for res in res_l:
26         print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
异步调用

 

    close join

  p.map()  异步提交任务,参数是可迭代对象,自带close + join

 1 import time
 2 from multiprocessing import Process,Pool
 3 
 4 
 5 def func(i):
 6     num = 0
 7     for j in range(5):
 8         num += i
 9 
10 
11 if __name__ == '__main__':
12     pool = Pool(4)  #
13     p_list = []
14     start_time = time.time()
15     for i in range(100):
16         p = Process(target=func,args=(i,))
17         p_list.append(p)
18         p.start()
19     [pp.join() for pp in p_list]
20     end_time = time.time()
21     print(end_time - start_time)
22 
23     s_time = time.time()
24     pool.map(func,range(100))  #map
25     e_time = time.time()
26     print(e_time - s_time)
map

 回调函数:callback

 1 import os
 2 from multiprocessing import Pool
 3 
 4 def func1(n):
 5     print('func1>>',os.getpid())
 6     print('func1')
 7     return n*n
 8 
 9 def func2(nn):
10     print('func2>>',os.getpid())
11     print('func2')
12     print(nn)
13     # import time
14     # time.sleep(0.5)
15 if __name__ == '__main__':
16     print('主进程:',os.getpid())
17     p = Pool(5)
18     #args里面的10给了func1,func1的返回值作为回调函数的参数给了callback对应的函数,不能直接给回调函数直接传参数,他只能是你任务函数func1的函数的返回值
19     # for i in range(10,20): #如果是多个进程来执行任务,那么当所有子进程将结果给了回调函数之后,回调函数又是在主进程上执行的,那么就会出现打印结果是同步的效果。我们上面func2里面注销的时间模块打开看看
20     #     p.apply_async(func1,args=(i,),callback=func2)
21     p.apply_async(func1,args=(10,),callback=func2)
22 
23     p.close()
24     p.join()
25 
26 #结果
27 # 主进程: 11852  #发现回调函数是在主进程中完成的,其实如果是在子进程中完成的,那我们直接将代码写在子进程的任务函数func1里面就行了,对不对,这也是为什么称为回调函数的原因。
28 # func1>> 17332
29 # func1
30 # func2>> 11852
31 # func2
32 # 100
View Code
1 需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,这是进程池特有的,普通进程没有这个机制,但是我们也可以通过进程通信来拿到返回值,进程池的这个回调也是进程通信的机制完成的。
2 
3 我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果
回调函数

 

其他方法:
p.close():关闭进程池,防止进一步操作
p.join():等待所有工作进程退出,次方法只能在close()或teminate()之后调用

更多点击

 

  

 

  

 

  

  

推荐阅读