首页 > 技术文章 > 12.并发编程之线程

journeyer-xsh 2020-07-25 08:16 原文

一、线程基础和GIL锁

1.1 线程和进程

进程

  • 数据隔离,资源分配的最小单位,可以利用多核,操作系统调度,数据不安全,开启关闭切换时间开销大,一般开启的进程数不会超过cpu个数的两倍

线程

  • 同一个进程中的多个线程同时被cpu执行,数据共享,操作系统调度的最小单位,可以利用多核,操作系统调度,数据不安全,开启关闭切换时间开销小

竞态条件:两个或多个线程访问同一片数据,由于数据访问顺序不同,可能导致结果不同,即数据不安全。

python在设计时设定python虚拟机(解释器主循环)中只能有一个控制线程在执行,就像单核cpu上的多进程一样,程序轮流执行,任意时刻只能有一个程序在执行,这由全局解释器锁(GIL)控制的,保证同时只能有一个线程在执行。

cpython中多线程

  • gc 垃圾回收机制

    • 引用计数+分代回收:就是一个线程,随着一个线程的开启而开启,一旦有一个变量引用计数为0就被回收。
    • 全局解释器锁的出现主要是为了完成gc的回收机制,对不同的线程引用计数的变化记录更加精准
    • 全局解释器锁 GIL(global interpreter lock):导致了同一个进程中的多个进程只能有一个线程真正被cpu执行
  • 节省的是io操作的时间,而不是cpu时间,大部分情况下不能把一条进程中进程中所有的io操作避免。

二、threading

t.ident # 线程id

线程不能从外部terminate

所有子进程只能是自己执行完代码之后就关闭的

enumerate:返回列表,存储了所有活着的线程(包括主线程)。

active_count:返回int,存储活着的线程个数。

在函数中查看进程id:current_thread().ident()方法

2.1 开启线程方式一

import time
from threading import Thread, current_thread
import os

def func(i):
    print('start%s'%i, current_thread().ident())
    time.sleep(1)
    print('end%s'%i)



if __name__ == '__main__':
    t1 = []
    for i in range(10):
        t = Thread(target=func, args=(i,))
        t.start()
        print(t.ident, os.getpid())
        t1.append(t)
    for t in t1:t.join()

2.2 开启线程的方式二:面向对象

# 面向对象的方式创建线程
from threading import Thread
class MyThread(Thread):
    def __init__(self,a,b):
        self.a = a 
        self.b = b
        super().__init__()
    def run(self):
        print(self.ident)

t = MyThread(1,2)
t.start()
print(t.ident)

线程之间的数据是共享的

三、在一个进程下开启线程与在一个进程下开启多个子进程的区别

1、进程开销远大于线程

2、同一个进程内的多个线程共享地址空间

3、pid

4、线程都属于一个进程

四、线程对象的属性和方法

4.1 threading模块提供的一些方法

名称 描述
threading.enumerate() 返回一个包含正在运行的线程的list。
threading.activeCount() 返回正在运行的线程数量

4.2 Tread对象的一些方法

名称 描述
is_alive() 返回线程是否是活动的
getName() 返回线程名
setName() 设置线程名
from threading import Thread, currentThread     
from threading import active_count
from threading import enumerate
import time

def task():
    print('线程名是%s' %currentThread().getName() )
    time.sleep(1)

if __name__ == '__main__':
    t = Thread(target=task, name='子线程1')
    t.start()
    print(active_count())		# 得到进程活跃数
    print(enumerate())		# 返回一个包含正在运行的线程的list
    currentThread().setName('主线程')		# 设置主线程,默认是MainThread
    print(t.isAlive())		# 判断线程是否存活
    print('主线程', currentThread().getName())
    
# 线程名是子线程1
# 2
# [<_MainThread(MainThread, started 13236)>, <Thread(子线程1, started 7484)>]
# True
# 主线程 主线程

五、守护线程

一个进程内,如果不开线程,默认就是一个主线程,主线程代码运行完毕,进程被销毁。

守护线程:守护的主线程结束才会结束,如果有其它线程,会等待其他线程结束才结束。

无论是进程还是线程都遵循:守护xxx会等待主xxx运行完毕之后再销毁。

运行完毕的含义:

  1. 对于主进程来说,运行完毕就是指主进程代码运行完毕。
  2. 对主线程来说,运行完毕指的是主线程所在的几次呢很难过内所有非守护线程统统运行完毕。
from threading import Thread
import time

def func(name):
    time.sleep(2)
    print('线程%s'%name)


if __name__ == '__main__':
    t = Thread(target=func, args=('线程1',))
    # 守护线程必须在t.start()之前设置
    t.darmon = True
    # t.daemon(True)
    t.start()
    print('主线程')
    print(t.is_alive())
# 主线程
# True
# 线程线程1

六、线程锁

6.1 锁

注意:

​ 1、线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来。

  2、join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高。

DIL和Lock的区别:

  • GIL是解释器级别的锁,保护的是解释器级别的数据,比如垃圾回收的数据,
  • lock是保护用户自己开发的应用程序的数据(GIL不负责),需要用户自己加锁,即加Lock。

join()和Lock的区别:join()是等线程的所有代码执行完,相当于锁住了一个线程的所有代码,而lock只是锁住了一部分操作共享数据的代码。

锁通常用来实现对共享的资源的同步访问,为每一个共享资源创建一个Lock对象,当你需要访问该资源的时候,调用acquire()来获取锁对象(如果其他线程已经获得了该锁,则当线程需要等待其被释放,待资源访问完后,再调用release()释放锁。)

不加锁的示例:

from threading import Thread
import os,time

def work():
    global n
    temp = n
    time.sleep(0.32)
    n=temp-1


if __name__ == '__main__':
    n=100
    l = []
    for i in range(100):
        p = Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()    

    print(n)
# 结果为99
# 翻译成cpu指令
import dis
dis.dis(func)

+=,-=,*=,/= while if 数据不安全

append pop数据安全,列表中的方法或者字典中的数据安全

queue logging 数据安全的

a = c.strip() 带返回值的都是先计算后赋值。

互斥锁和join的区别

  • 不加锁:并发执行,速度快,数据不安全

  • 加锁:为加锁部分并发执行,加锁部分串行执行,速度慢,但是数据安全

  • 虽然start之后立即join,线程内的代码也是串行执行的,加锁只是加锁部分(修改共享数据的部分)是串行的,结果或者或者说目的是一样的,但是加锁的效率更高。

不用加锁:不要操作全局变量,不要在类里操作静态变量,就不用加锁。

单例模式

import time
# 实例化的时候只加载new,new上面的3行代码只加载一次,这就导致所有对象只用一把锁
# 函数使用时,
class A:
    from multiprocessing import Lock
    __instance = None
    lock = Lock()
    def __new__(cls, *args, **kwargs):
        with cls.lock:
            if not cls.__instance:
                time.sleep(0.000001)    # cpu轮转
                cls.__instance = super().__new__(cls)
            return cls.__instance

def func():
    a = A()
    print(a)		# 打印的内存地址都是一样的

from threading import Thread
for i in range(10):
    Thread(target=func).start()

6.2 死锁

死锁:是指两个或者多个进程或者线程在执行的过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,都无法运行下去。此时系统处于死锁状态,这些成为死锁进程。

互斥锁出现死锁现象,最快速的解决办法把所有的互斥锁都改成一把递归锁。效率降低。

6.3 互斥锁和递归锁

Lock 互斥锁 效率高

RLock 递归锁 因为递归,效率低

死锁问题的解决办法:递归锁可以连续acquire多次,每acquire一次计数器加1;只要计数不为0,就不能被其他线程抢到(只有计数为0,才能被抢到acquire),在Python中为了支持同一线程中多次请求同一资源,提供了可重入锁RLock

from multiprocessing import Lock,RLock

l = Lock()
l.acquire()
l.acquire()     # 此处仍被锁住
print('被锁住的代码') 
l.release()

# 递归锁,同一个线程中可以acquire多次
r1 = RLock()
r1.acquire()
r1.acquire()
print('被锁住的代码')
r1.release()
from threading import Thread, RLock
import time

"""链式赋值"""
mutexA=mutexB=RLock()   # 使用递归锁可以

class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()    # 递归锁计数器加1
        print("%s 拿到A锁" % self.name)

        mutexB.acquire()
        print("%s 拿到了B锁" % self.name)

        mutexB.release()   # 递归锁计数器减1
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print("%s 拿到B锁" % self.name)
        time.sleep(1)   # 线程1在此休息0.1秒
        mutexA.acquire()
        print("%s 拿到了A锁" % self.name)

        mutexA.release()
        mutexB.release()    # 计数为0,其他线程可以抢acquire

if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()   # 信号提交,就几乎立马启动了
# 第一个线程计数器为0 后,其他线程可以开始抢acquire,因此顺序是不固定的。

七、线程queue

queue队列:使用和import queue,用法和进程Queue一样

q.put()可以插入数据到队列

q.get()可以从队列中读取并删除该元素

7.1 queue.Queue(maxsize)

定义好队列存储数据的最大值,队列是存取规则是先进先出

import queue
q.queue.Queue(10)		# 队列的最大值是10
# 队列满了或者空了会直接阻塞

7.2 q.put()和q.get()方法的参数问题

import queue

q = queue.Queue(2)  
q.put("first")   # 放值进去
q.put(2)
# q.put(4)   # 队列满了,阻塞
# q.put(4, block=False)   # 默认是block=True, 改为False后,队列满了还加数据,程序报错raise Full queue.Full
# q.put(4, block=True, timeout=3)   # 设置了block=True队列满不会直接报错了,但是还加上了timeout=3,程序会等3秒后提示报错queue.Full

# 同理get()方法也有这些参数
print(q.get())   # 取数据
print(q.get())
print(q.get())
# print(q.get(block=False))   # 在队列空,还取时,一般是卡住,但加入了block=False参数的话,会提示报错queue.Empty
print(q.get_nowait())       # 这个效果同上
print(q.get(block=True, timeout=3))    # 队列空,还取数据时,会按照timeout时间等待,到时间后提示queue.Empty

queue的put()和get()方法参数问题

7.3 queue.LifeQueue(maxsize)

定义为堆栈,堆栈的存取规则是后进先出。

from queue import LifoQueue     # 栈
lq = LifoQueue()
lq.put(1)
lq.put(2)
lq.put(3)

print(lq.get())
print(lq.get())
print(lq.get())
# 
# 3
# 2
# 1

7.4 queue.PriorityQueue(maxsize)

优先级队列,存储数据时可设置优先级的队列。数字越小优先级越高。

from queue import PriorityQueue     # 优先级队列,vip购票
priq = PriorityQueue()
priq.put(2, 'an')
priq.put(1, 'bn')
priq.put(0, 'cn')

print(priq.get())
print(priq.get())
print(priq.get())
# 
# 0
# 1
# 2

推荐阅读