首页 > 技术文章 > Python多进程笔记(Python_MultiProcess)

Lilyan 2020-07-12 23:09 原文

1 MutiProcessing(多进程)使用:

a. 什么是多进程?

在上面我们使用多线程去分别处理不同的事情,看起来,多线程处理并不比单线程循环处理的效率看起来那么的高。多进程是在利用我们电脑CPU多核的特性,去提高我们处理多个事情的效率。

b. 创建一个进程:

创建一个进程我们需要先导入涉及到进程的模块import multiprocessing as mp

进程拥有类似线程的方法。举个例子如下:

import multiprocessing as mp

def job(a, b):
    print(a + b)

if __name__ == '__main__':
    p1 = mp.Process(target= job, args = (1, 2))
    p1.start()
    p1.join()

c. 多进程中的Queue的使用:

在上面提到的多线程中使用Queue来保存工作结果以及多个线程之间的通信。在多进程中,Queue的作用依然是保存工作结果以及进行通信。让我们看下面例子来使用Queue保存result,如下:

import multiprocessing as mp

def job(q):
    res = 0
    for i in range(1000):
        res += i + i ** 2 + i ** 3
    q.put(res)

if __name__ == '__main__':
    q = mp.Queue()
    p1 = mp.Process(target= job, args = (q, )) #注意这里的args即使只有一个参数,也必须加上逗号,表示这个参数是一个可迭代的对象
    p2 = mp.Process(target= job, args = (q, ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('Process1 compute result1: %d' % res1)
    print('Process2 compute result2: %d' % res2)

d. 多线程与多进程以及正常处理的效率对比:

请看以下例子,我们使用双核来测试,创建两个进程,创建两个线程,在单线程进行两次循环,进行相同的job。比较三者的效率可发现多进程在计算上会优于多线程以及普通方法。而由于GIL的存在,多线程看起来是分配Job给不同的线程进行处理,但是在数量级大的操作上,他会因为在单核中切换线程耗费不少时间。因此,多线程的处理效果往往不如单线程的循环操作。例子如下:

import multiprocessing as mp
import threading as tp
import time
def job(q):
    res = 0
    for i in range(1000000):
        res += i + i ** 2 + i ** 3
    q.put(res)

def MultiProcessJob():
    q = mp.Queue()
    p1 = mp.Process(target= job, args = (q, )) #注意这里的args即使只有一个参数,也必须加上逗号,表示这个参数是一个可迭代的对象
    p2 = mp.Process(target= job, args = (q, ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('Process1 compute result1: %d' % res1)
    print('Process2 compute result2: %d' % res2)

def MultiThreadJob():
    q = mp.Queue()
    t1 = tp.Thread(target= job, args = (q, )) #注意这里的args即使只有一个参数,也必须加上逗号,表示这个参数是一个可迭代的对象
    t2 = tp.Thread(target= job, args = (q, ))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res3 = q.get()
    res4 = q.get()
    print('Thread1 compute result3: %d' % res3)
    print('Thread2 compute result4: %d' % res4)

def NormalJob():
    res = 0
    for i in range(2):
        for j in range(1000000):
            res += i + i ** 2 + i ** 3
    print('Normal compute result5: %d' % res)

if __name__ == '__main__':
    st = time.time()
    NormalJob()
    ft = time.time()
    print('Normal Time is:', ft - st)
    st1 = time.time()
    MultiThreadJob()
    ft1 = time.time()
    print('MultiThread Time is', ft1 - st1)
    st2 = time.time()
    MultiProcessJob()
    ft2 = time.time()
    print('MultiProcess Time is', ft2 - st2)
'''
result:
Normal compute result5: 3000000
Normal Time is: 1.0905730724334717
Thread1 compute result3: 249999833333583333000000
Thread2 compute result4: 249999833333583333000000
MultiThread Time is 1.2889320850372314
Process1 compute result1: 249999833333583333000000
Process2 compute result2: 249999833333583333000000
MultiProcess Time is 0.668349027633667
'''

e. 进程池Pool的使用:

进程池是给我们自动的分配计算机中CPU多核资源进行计算,比如你要使用4个进程去分别处理相同的job,手动则需要建立四个Process,会比较麻烦。所以引入进程池创建进程使用CPU多核资源相当简便。

Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。
参考下面代码

import multiprocessing as mp

def job(x) :
    return x * x

def multiProcess():
    pool = mp.Pool(processes = 4) #利用4核处理
    res = pool.map(job, range(100)) #将结果map组合起来
    print(res)
    res = pool.apply_async(job, (2, )) #apply_async方法是异步阻塞,不用等待当前进程执行完毕,随时根据系统调度来进行切换
    print(res.get())
    multi_res = [pool.apply_async(job, (i, )) for i in range(100)]
    print([res.get() for res in multi_res])

if __name__ == '__main__':
    multiProcess()

f. 共享内存以及进程锁(shared memory & lock)

共享内存的意思是,你的计算机CPU有多核处理器,假设我们的CPU是一个四核处理器,我们想让Process1去计算完一个变量,然后我们想让Process2在这个变量上继续进行操作。那么我们就要使用共享内存。使用multiprocessing.Value('i', int类型的数值).之所以使用lock是保证各个进程之间单独执行完,不会发生抢占式对共享内存中的数据进行修改。例子如下:

import multiprocessing as mp
import time

def job(v, num, lock):
    lock.acquire()
    for i in range(10):
        time.sleep(0.2)
        v.value += num
        print(v.value)
    lock.release()

def multiProcess():
    lock = mp.Lock()
    v = mp.Value('i', 0)
    p1 = mp.Process(target=job, args=(v, 10, lock))
    p2 = mp.Process(target=job, args=(v, 100, lock))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multiProcess()

这个Code主要是给定两个进程同时对共享内存中的数据操作,Process1进行add 10, Process2进行add 100.通过lock锁控制各个进程的compute互不干扰。
Code欢迎给个star

推荐阅读