首页 > 技术文章 > 多线程 Threading Multiprocessing(Python)

eugene0 2019-09-18 22:27 原文

多线程是加速程序计算的有效方式,Python的多线程模块threading上手快速简单,学习莫烦多线程教程动手操作了一遍,这里记录一下。

1 Threading

1.1 添加线程

import threading
#获取已激活的线程数
print(threading.active_count())  #1

#查看所有线程信息
print(threading.enumerate())  #[<_MainThread(MainThread, started 18496)>]

#查看现在正在运行的线程
print(threading.current_thread()) #<_MainThread(MainThread, started 18496)>


import threading

def thread_job():
    print('This is a thread of %s' % threading.current_thread())

def runMain():
    thread = threading.Thread(target=thread_job,) #定义线程
    thread.start()  #线程开始工作

if __name__ == '__main__':
    runMain()

#输出
This is a thread of <Thread(Thread-1, started 12324)>

1.2 join功能

不加join功能,线程任务还未完成便输出all done。

import threading
import time

def thread_job():
    print('T1 start.\n')
    for i in range(10):
        time.sleep(0.1)  #任务间隔0.1秒
    print('T1 finish.\n')

add_thread = threading.Thread(target=thread_job, name='T1')
add_thread.start()

print('all done.\n')
#输出
T1 start.
all done.
T1 finish.

若要遵循顺序,在启动线程后调用join , 使用join控制多个线程的执行顺序,效果如下。

import threading
import time

def thread_job():
    print('T1 start.\n')
    for i in range(10):
        time.sleep(0.1)  #任务间隔0.1秒
    print('T1 finish.\n')

add_thread = threading.Thread(target=thread_job, name='T1')
add_thread.start()
add_thread.join()

print('all done.\n')
#输出
T1 start.
T1 finish.
all done.

1.3 存储进程结果Queue

将数据列表中的数据传入,使用四个线程处理,将结果保存在Queue中,线程执行完后,从Queue中获取存储的结果

#导入线程 队列的标准模块
import threading
import time
from queue import Queue

定义一个被多线程调用的函数:函数的参数时一个列表l和一个队列q,函数的功能是对列表的每个元素进行平方计算,将结果保存在队列中

def job(l,q):
    for i in range(len(l)):
        l[i] = l[i]**2
    q.put(l)

定义一个多线程函数:在多线程函数中定义一个Queue ,用来保存返回值 ,代替return ,定义一个多线程列表 ,初始化一个多维数据列表

def mulithreading():
    q = Queue() #q中存放返回值 代替return的返回值
    threads = []
    data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]]

在多线程函数中定义四个线程,启动线程,将每个线程添加到多线程的列表中

for i in range(4): #定义四个线程
        t = threading.Thread(target=job,args=(data[i],q))
        t.start()
        threads.append(t) #把每个线程append到线程列表中

分别join四个线程到主线程

for thread in threads:
        thread.join()

定义一个空列表results 将四个线程运行后保存在队列中的结果返回给results

results = []
    for _ in range(4):
        results.append(q.get()) #q.get()按顺序从q中拿出一个值
    print(results)

完整代码:

#导入线程 队列的标准模块
import threading
import time
from queue import Queue

#定义一个被多线程调用的函数
def job(l,q):
    for i in range(len(l)):
        l[i] = l[i]**2
    q.put(l)
#定义一个多线程函数
def mulithreading():
    q = Queue() #q中存放返回值 代替return的返回值
    threads = []
    data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]]
    for i in range(4): #定义四个线程
        t = threading.Thread(target=job,args=(data[i],q))
        t.start()
        threads.append(t) #把每个线程append到线程列表中
    #分别join四个线程到主线程
    for thread in threads:
        thread.join()
    #定义一个空列表results 将四个线程运行后保存在队列中的结果返回给results
    results = []
    for _ in range(4):
        results.append(q.get()) #q.get()按顺序从q中拿出一个值
    print(results)
if __name__ == '__main__':
    mulithreading()

#输出
[[1, 4, 9], [9, 16, 25], [16, 16, 16], [25, 25, 25]]

1.4 GIL 不一定有效率

python 的多线程 threading 有时候并不是特别理想. 最主要的原因是就是, Python 的设计上, 有一个必要的环节, 就是 Global Interpreter Lock (GIL). 这个东西让 Python 还是一次性只能处理一个东西.

import threading
from queue import Queue
import copy
import time

def job(l, q):
    res = sum(l)
    q.put(res)

def multithreading(l):
    q = Queue()
    threads = []
    for i in range(4):
        t = threading.Thread(target=job, args=(copy.copy(l), q), name='T%i' % i)
        t.start()
        threads.append(t)
    [t.join() for t in threads]
    total = 0
    for _ in range(4):
        total += q.get()
    print(total)

def normal(l):
    total = sum(l)
    print(total)

if __name__ == '__main__':
    l = list(range(10000000))
    s_t = time.time()
    normal(l*4)
    print('normal: ',time.time()-s_t)
    s_t = time.time()
    multithreading(l)
    print('multithreading: ', time.time()-s_t)
#输出
199999980000000
normal:  1.7343778610229492
199999980000000
multithreading:  2.218825340270996

程序 threading 和 Normal 运行了一样多次的运算. 但是我们发现 threading 却没有快多少, 按理来说, 我们预期会要快3-4倍, 因为有建立4个线程, 但是并没有. 这就是其中的 GIL 在作怪.

1.5 线程锁

不使用锁

import threading

def job1():  #全局变量A的值每次加1,循环10次
    global A
    for i in range(10):
        A += 1
        print('job1',A)

def job2(): #全局变量A的值每次加10,循环10次
    global A
    for i in range(10):
        A += 10
        print('job2',A)

if __name__ == '__main__':
    A = 0
    t1 = threading.Thread(target=job1)
    t2 = threading.Thread(target=job2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

#输出 打印结果非常混乱
job1 1
job1 2
job1 3
job1 4
job2 14
job1 15
job2 25
job1 26
job2 36
job1 37
job2 47
job1 48
job2 58
job1 59
job2 69
job1 70
job2 80
job2 90
job2 100
job2 110

使用锁
lock在不同线程使用同一共享内存时,能够确保线程之间互不影响,使用lock的方法是, 在每个线程执行运算修改共享内存之前,执行lock.acquire()将共享内存上锁, 确保当前线程执行时,内存不会被其他线程访问,执行运算完毕后,使用lock.release()将锁打开, 保证其他的线程可以使用该共享内存。

import threading

def job1():
    global A;lock = threading.Lock()
    lock.acquire()
    for i in range(10):
        A += 1
        print('job1',A)
    lock.release()

def job2():
    global A;lock = threading.Lock()
    lock.acquire()
    for i in range(10):
        A += 10
        print('job2',A)
    lock.release()

if __name__ == '__main__':
    A = 0
    t1 = threading.Thread(target=job1)
    t2 = threading.Thread(target=job2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

#输出  使用lock后 执行完一个线程后再执行另一个线程。使用lock和不使用lock,最后打印输出的结果是不同的。
job1 1
job1 2
job1 3
job1 4
job1 5
job1 6
job1 7
job1 8
job1 9
job1 10
job2 20
job2 30
job2 40
job2 50
job2 60
job2 70
job2 80
job2 90
job2 100
job2 110

2 Multiprocessing

多进程 Multiprocessing 和多线程 threading 类似, 都是在 python 中用来并行运算的。不过既然有了 threading, 为什么 Python 还要出一个 multiprocessing 呢? 因为要用来弥补 threading 的一些劣势, 比如在 threading 教程中提到的GIL, python 把 multiprocessing 和 threading 的使用方法做的几乎差不多,使用多线程发挥电脑多核系统的威力。

2.1添加Process

#导入线程进程标准模块
import multiprocessing as mp
import threading as td

#定义一个被线程和进程调用的函数
def job(a,d):
    print('AA')

#创建线程和进程
t1=td.Thread(target=job,args=(1,2))
p1=mp.Process(target=job,args=(1,2))

#启动线程和进程
t1.start()
p1.start()

#连接线程和进程
t1.join()
p1.join()

#可以看出线程和进程的使用方式相似

完整代码

#导入线程进程标准模块
import multiprocessing as mp
import threading as td

#定义一个被进程调用的函数
def job(a,d):
    print('AA')

if __name__ == '__main__':
    p1 = mp.Process(target=job, args=(1, 2)) #创建进程
    p1.start() #启动进程
    p1.join()  #连接进程
#输出
AA

2.2 存储进程输出 Queue

Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。因为多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果。
定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果

#定义一个多线程调用函数
def job(q): #注:该函数没有返回值
    res = 0
    for i in range(1000):
        res += i+i**2+i**3
    q.put(res) #queue

定义两个线程函数,用来处理同一个任务, args 的参数只要一个值的时候,参数后面需要加一个逗号,表示args是可迭代的,后面可能还有别的参数,不加逗号会出错

p1 = mp.Process(target=job,args=(q,))
p2 = mp.Process(target=job,args=(q,))

完整代码实现

import multiprocessing as mp

#定义一个多线程调用函数
def job(q): #注:该函数没有返回值
    res = 0
    for i in range(1000):
        res += i+i**2+i**3
    q.put(res) #queue
if __name__ == '__main__':
    q=mp.Queue()  #定义一个多线程队列 存储结果
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start() #启动线程  分两批处理
    p2.start()
    p1.join() #连接线程
    p2.join()
    res1=q.get() #分两批输出 将结果分别保存
    res2=q.get()
    print(res1+res2)
#输出
499667166000

2.3效率对比

对比下多进程,多线程和什么都不做时的消耗时间,看看哪种方式更有效率。

import multiprocessing as mp
def job(q):
    res=0
    for i in range(1000000):
        res += i + i**2 + i**3
    q.put(res)

#由于多进程是多核运算 多进程代码命名为multicore()
def multicore():
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('multicore:',res1 + res2)

#创建多线程
import threading as td
def multithread():
    q = mp.Queue() # thread可放入process同样的queue中
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('multithread:', res1 + res2)

#创建普通函数
def normal():
    res = 0
    for _ in range(2):
        for i in range(1000000):
            res += i + i**2 + i**3
    print('normal:', res)

import time
if __name__ == '__main__':
    st = time.time()
    normal()
    st1 = time.time()
    print('normal time:', st1 - st)
    multithread()
    st2 = time.time()
    print('multithread time:', st2 - st1)
    multicore()
    print('multicore time:', time.time() - st2)

#输出
normal: 499999666667166666000000
normal time: 1.6875250339508057
multithread: 499999666667166666000000
multithread time: 3.1562907695770264
multicore: 499999666667166666000000
multicore time: 1.0937612056732178

这次运行时间依然是:多进程 < 普通 < 多线程。 发现多核/多进程最快,说明在同时间运行了多个任务。 而多线程的运行时间居然比什么都不做的程序还要慢一点,说明多线程还是有短板。

2.4 进程池Pool

进程池就是将所要运行的东西,放到池子里,Python会自行解决多进程的问题
2.4.1 进程池Pool()和map()

#定义一个Pool
pool = mp.Pool()

有了池子之后,就可以让池子对应某一个函数,向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。
接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果 res = pool.map(job, range(10))

import multiprocessing as mp

def job(x):
    return x*x

def multicore():
    pool = mp.Pool()
    res = pool.map(job,range(10))
    print(res)

if __name__ == '__main__':
    multicore()

#输出
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

2.4.2 自定义核数量
怎么知道Pool是否真的调用了多个核呢?可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况
打开CPU负载:活动监视器 > CPU > CPU负载(单击一下即可)
Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量,

def multicore():
    pool = mp.Pool(processes=3) # 定义CPU核数量为3
    res = pool.map(job, range(10))
    print(res)

2.4.3 apply_async()
Pool除了map()外,还有可以返回结果的方式,就是apply_async()。apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get获得结果
    print(res.get())

#运行结果
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4

2.4.4 apply_async()输出多个结果
在apply_async()中多传入几个值
res = pool.apply_async(job, (2,3,4,)) #报错 TypeError: job() takes exactly 1 argument (3 given) 即apply_async()只能输入一组参数。
将apply_async() 放入迭代器中,定义一个新的multi_res
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
取出值时需要一个一个取出来
print([res.get() for res in multi_res])
合并代码

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get获得结果
    print(res.get())
    # 迭代器,i=0时apply一次,i=1时apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 从迭代器中取出
    print([res.get() for res in multi_res])
#运行结果

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # multi_res
  • Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
  • map() 放入迭代参数,返回多个结果
  • apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代

2.5 共享内存 shared memory

**2.5.1 Shared Value **
使用Value数据存储在一个共享的内存表中。

import multiprocessing as mp

value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)

其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型
2.5.2 Shared Array
在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据
array = mp.Array('i', [1, 2, 3, 4])
这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。

2.6 进程锁

2.6.1 不加锁

import multiprocessing as mp

def job(x):
    return x*x

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get获得结果
    print(res.get())
    # 迭代器,i=0时apply一次,i=1时apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 从迭代器中取出
    print([res.get() for res in multi_res])

if __name__ == '__main__':
    multicore()
#输出
1
4
5
8
9
12
13
16
17
20

上面代码中定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1和p2 中设定了不同的累加值。所以来看下这两个进程是否会出现冲突。
2.6.2 加锁

import multiprocessing as mp
import time

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1)
        v.value += num # 获取共享内存
        print(v.value)
    l.release() # 释放

def multicore():
    l = mp.Lock() # 定义一个进程锁
    v = mp.Value('i', 0) # 定义共享内存
    p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入
    p2 = mp.Process(target=job, args=(v,3,l))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()
#运行一下,看看是否还会出现抢占资源的情况

1
2
3
4
5
8
11
14
17
20

运行结果显示,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

推荐阅读