一、用fork创建多进程
import os import time ret=os.fork() print("ret=%d"%ret) if ret>0: #while(True): #print("1111111") print(" 父进程%d"%os.getpid()) time.sleep(1) print(" 父进程结束") else: while(True): #print("2222222") print("子进程%d %d"%(os.getpid(),os.getppid())) time.sleep(5) print("子进程结束") print("---over---")
import os import time g_num=100 ret=os.fork() print("ret=%d"%ret) if ret==0: g_num += 1 time.sleep(1) print(g_num) else: time.sleep(5) print(g_num) print("---over---")
import os import time ret=os.fork() if ret==0: print("1") else: print("2") ret=os.fork() if ret==0: print("111") else: print("222")
二、用Process创建进程
from multiprocessing import Process import time def test(): for i in range(5): print("1") time.sleep(2) p=Process(target=test) p.start() #p.join() #阻塞主进程,从此处等待子进程p结束后往下执行,join里面可以加上timeout时间,超时时间 print("父进程结束") #用Process创建的子进程,父进程等待所有的子进程结束后结束 #用fork创建的父进程结束,不受子进程影响
三、用继承Process创建进程
from multiprocessing import Process import time class MyNewProcess(Process): def run(self): time_start=time.time() for i in range(1,10): time.sleep(1) print("1") time_end=time.time() print("子程序运行时间:%d"%(time_end-time_start)) p=MyNewProcess() p.start() p.join(5) print("主程序结束")
四、用进程池创建多任务
from multiprocessing import Pool import os import time import random def worker(msg): t_start=time.time() print("%d开始执行,进程号为%d"%(msg,os.getpid())) time.sleep(random.random()*2) t_end=time.time() print(msg," 执行的时间为%0.2f"%(t_end-t_start)) p=Pool(3) for i in range(0,10): print("创建第%d个任务"%i) p.apply_async(worker,(i,)) print("-----------start------------") p.close() p.join() print("-----------end------------")
五、进程间通讯-消息队列
from multiprocessing import Process from multiprocessing import Queue import os import time import random def read(q): while(True): if(not q.empty()): t=q.get(True) print("读出一个数据%s"%t) time.sleep(random.random()) else: break def write(q): for i in ['a','b','c','d','e','f','g']: q.put(i) print("添加%s到队列"%i) time.sleep(random.random()) if __name__=='__main__': q = Queue() pw=Process(target=write,args=(q,)) pr=Process(target=read,args=(q,)) pw.start() pw.join() pr.start() pr.join()
六、进程池中进程互相通讯(阻塞方式)-消息队列
from multiprocessing import Pool from multiprocessing import Manager import os import time import random def read(q): while(True): if(not q.empty()): t=q.get(True) print("读出一个数据%s"%t) time.sleep(random.random()) else: break def write(q): for i in ['a','b','c','d','e','f','g']: q.put(i) print("添加%s到队列"%i) time.sleep(random.random()) if __name__=='__main__': q = Manager().Queue() po=Pool() po.apply(read,(q,)) po.apply(write,(q,)) po.close() po.join() print("%d End"%os.getpid())
七、进程池拷贝文件
import os from multiprocessing import Pool from multiprocessing import Manager #复制单个文件的函数,参数列表包括文件夹名称及路径,待复制的路径 def copy_file(name,oldfilepath,newfilepath,queue): fr=open(oldfilepath+'/'+name) fw=open(newfilepath+'/'+name,'w') content=fr.read() fw.write(content) fr.close() fw.close() queue.put(name) def main(): #提示输入要复制的文件夹及路径 old_file_path=input("请输入要复制的文件夹路径") #创建一个文件夹“复件”,要求输入新路径 new_file_path=old_file_path+"_复件" os.mkdir(new_file_path) #获取待复制文件夹的所有文件名 filenames=os.listdir(old_file_path) #创建多进程进行复制 p=Pool(5) print("进程池已准备好") q=Manager().Queue() print("进程间队列已准备好") for name in filenames: print("准备复制文件“%s”"%name) p.apply_async(copy_file,(name,old_file_path,new_file_path,q)) #队列统计已复制的文件名及个数 sum=len(filenames) wanchengnum=0 while(wanchengnum<sum): q.get() wanchengnum+=1 print("共%d个文件,已经复制%d个文件"%(sum,wanchengnum)) p.close() p.join() if __name__ == '__main__': main()