首页 > 技术文章 > python并发和性能(进程,线程,协程,队列)

jiangmingbai 2019-06-09 17:04 原文

一、并发和并行

1、多任务

  • 多任务的概念
    • 简单的说,就事操作系统可以同时运行多个任务
  • CPU与多任务的关系:
  • 单核CPU可不可以多任务?
    • 也可以执行多任务,由于CPU执行代码都是顺序执行的,那么单核CPU是怎么执行多任务的呢?
    • 答案就是操作系统轮流让各个任务交替执行,任务1执行0.01秒,切换到任务2,任务2执行0.01秒,在切换到任务3,执行0.01秒.......这样反复下去,表面上看,每个任务都是交替执行的,但是,由于CPU的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样。
  • 真正的并行执行多任务只能在多核CPU上实现,但是由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把多任务轮流调度到每个核心上执行。

2、并发和并行

  • 并发: 指的是任务数多于CPU核数,通过操作系统的各种任务调度算法,实现用多任务“一起”执行(实际上总有一些任务不在执行,因为切换任务的速度相当快,看上去一起在执行而已)
  • 并行: 指的是任务数小于等于CPU核数,即任务真的在一起执行的
  • 并发

  • 并行

3、同步和异步

  • 同步(同步协调): 是指线程在访问某一资源时,获得了资源的返回结果之后才会执行其他操作,(先做某件事,再做某件事)
  • 异步: 与同步相对,是指线程再访问某一资源时,无论是否取得返回结果,都进行下一步操作;当有了资源返回结果时,系统自会通知线程。

二、线程

关于线程得官方文档

  • 问题:
    • 当前有两件事情,做事情1需要5秒,做事情2需要6秒
def func1():
    for i in range(5):
        print("-----正再做事情1------")
        time.sleep(1)


def func2():
    for i in range(6):
        print("-----正再做事情2------")
        time.sleep(1)
  • 单任务
    • 先做事情一
    • 在做事情二
  • 多任务
    • 两个事情同时做
    • 怎么才能同时做呢?
      • 多线程执行

1、Threadting模块介绍

Python的_thread模块是比较底层的模块,Python的threading模块是怼thread模块做了一层包装的,可以更加方便的使用

  • 创建多线程:threading.Thread(target=func1)
  • 参数target指定线程执行的任务(函数)

Thread类提供了以下方法

  • run() :用以表示线程活动的方法
  • start() :启动线程活动
  • join(timeout) :等待子线程的方法,默认等待子线程执行结束
  • lsAlive() :返回线程是否活动的
  • getName() :返回线程名
  • setName() :设置线程名

threading提供的以下方法

  • threading.current_thread() :返回当前执行的线程
  • threading.enumerate() :返回正在运行的所有线程(list)
  • threading.active_count() :返回正在运行的线程数量

2、多线线程实现多任务

  • 利用threading模块实现
import threading
import time


def fun_1():
    for i in range(5):
        print("线程{}:{}".format(threading.current_thread(), i))
        time.sleep(1)


def fun_2():
    for i in range(6):
        print("线程{}:{}".format(threading.current_thread(), i))
        time.sleep(1)


def main():
    # 建立线程1 执行函数1
    t1 = threading.Thread(target=fun_1, name="th_1")
    # 建立线程2 执行函数2
    t2 = threading.Thread(target=fun_2, name="th_2")
    s_time = time.time()
    # fun_1()
    # fun_2()
    # 设置名字
    t1.setName("TH_1")
    print(t1.getName())
    # 开始
    t1.start()
    t2.start()
    # 查看线程集合,和线程数
    print(threading.enumerate())
    print(threading.active_count())
    # 等待子线程执行完毕
    t1.join()
    t2.join()
    e_time = time.time()
    print("时间:", e_time - s_time)


if __name__ == '__main__':
    main()

  • 重写run方法实现多线程
    • 线程启动start方法就是运行了run方法
    • 通过继承Thread类,重写run方法,来创见线程
    • 创建线程时不需要在传任务函数
    • 如果任务函数需要参数,需要重写init方法,并返回父类init方法
import threading
import time
import requests

# 计算时间装饰器
def add_time(fun):
    def inner(*args):
        start_time = time.time()
        fun(*args)
        end_time = time.time()
        print("---------------线程:{},执行时间:{}-----------------".format(threading.current_thread(), end_time - start_time))
        return end_time - start_time

    return inner


# 重写run方法创建多线程
class RequestThread(threading.Thread):
    def __init__(self, url):
        self.url = url
        super().__init__()

    @add_time
    def run(self):
        for i in range(100):
            res = requests.get(self.url).status_code
            print("线程{}得执行结果为:{}".format(threading.current_thread(), res))


@add_time
def main():
    t_list = []  # 线程列表
    # 建立线程
    for i in range(10):
        t = RequestThread("http://httpbin.org/post")
        t_list.append(t)
    # 开启线程
    for th in t_list:
        th.start()
    # 等待线程结束
    for th in t_list:
        th.join()


if __name__ == '__main__':
    total_time = main()
    print("每个接口平均时间:", total_time / 1000)

3、多线程-共享全局变量

  • 问题:1000000次的bug
    • 两个线程完成对全局变量的2百万次修改
    • 一个线程修改1百万次
    # 最后结果
    # 线程2修改完a = 1269517
    # 线程1修改完a = 1302318
    # 最后修改完a = 1302318
  • 可见增加了两百万次,a只有一百多万
    • 因为多线程对全局变量的修改时不安全的
      • 比如线程1获取了a的值为10w,此时切换到线程2
      • 线程2修改到20w去换到线程1
      • 此时线程1对a的修改不是从20w开始,而是10w,所以会导致a被重新覆盖了,导致全局变量不安全的情况发生

4、同步&互斥锁

  • 上面的bug如何解决?
    • 控制线程的执行,避免同时获取数据
  • 互斥锁
    • 线程同步能够保证保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁
    • 互斥锁为资源设定一个状态:锁定/非锁定
    • 某个线程要更改共享数据时,先将其锁定,此时资源的状态为"锁定",其他线程不能更改直到该线程释放了资源,将资源状态变成“非锁定”,其它线程才可以去获取锁,然后再次锁定该资源
    • 互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性
  • threading模块中定义了Lock类,可以方便的处理锁定
    • 创建锁:lock = threading.Lock()
    • 锁定 : lock.acquire()
    • 释放锁 :lock.release()
  • 注意
    • 如果这个锁之前没有上锁的,那么acquire不会阻塞
    • 如果在调用acpuire对这个锁上锁之前,它被其他线程上了锁,那么此时acpuire会阻塞,直到这个锁被其他线程解锁为止
    • 上锁之后多线程执行肯定会变慢
      • 上锁,释放锁需要时间
      • 上了锁的代码块等于同步执行,必须执行完锁内的代码块,并释放了锁,其他线程才有机会获取锁去执行。等于多线程锁内的代码是同步执行的,在只考虑锁内代码块的执行效率,甚至比单线程还差
  • 案例
import threading

# 全局变量,利用多线程
# 对a进行+1 ,两千万次,一个线程加已一千万次
import time

a = 0


def fun_1(lock):
    global a
    for i in range(1000000):
        # 修改前上锁
        lock.acquire()
        a += 1
        # 修改完释放锁
        lock.release()
    print("线程1修改完a={}".format(a))


def fun_2(lock):
    global a
    for i in range(1000000):
        # 修改前上锁
        lock.acquire()
        a += 1
        # 修改完释放锁
        lock.release()
    print("线程2修改完a={}".format(a))


def main():
    # 创建锁
    lock = threading.Lock()

    # 建立两个线程,把锁传进任务上锁
    s_time = time.time()
    t1 = threading.Thread(target=fun_1, args=(lock,))
    t2 = threading.Thread(target=fun_2, args=(lock,))

    t1.start()
    t2.start()

    # 等待子线程执行完毕
    t1.join()
    t2.join()
    e_time = time.time()
    print("最后修改完a={}".format(a))
    print("最后修改完得时间:{}".format(e_time - s_time))

    #
    # 锁后时间:


if __name__ == '__main__':
    main()
    """
    上锁前:
    线程2修改完a = 1269517
    线程1修改完a = 1302318
    最后修改完a = 1302318
    时间:0.15441060066223145
    
    上锁后:
    线程2修改完a=1976899
    线程1修改完a=2000000
    最后修改完a=2000000
    时间:1.1480515003204346
    
    """

5、死锁

  • 如果多线程存在多把锁
  • 线程1获取了锁1,还要在获取了锁2才能执行
  • 线程2获取了锁2,还要在获取锁1才能执行
  • 此时,任务1就等着任务2去释放了锁2,才有机会执行
  • 任务2就等着任务1释放了锁1,才有机会执行
  • 最后导致互相等对方释放锁,导致死锁

6、GIL全局解释器锁(扩展)

  • 控制线程切换运行得锁-解释器默认得锁
  • 利用一个时间得阈值去切换线程
  • 或者遇到IO阻塞得时候去切换
  • 多线程之间快速切换执行大大缩短时间就是因为任务在等待,但是CPU不会等待线程,它会去执行别得任务,多线程并发就是大大缩短了等待得时间
  • 具体请参考官方文档--线程
  • 注意:
    • Python语言和GIL没有半毛钱关系,仅仅是由于历史原因在Cpython解释器,难以移除GIL。
    • GIL:全局解释器锁,每个线程在执行得过程都需要先获取GIL,保证同一时刻只有一个线程可以执行代码。
    • Python使用多线程只能只用一个CPU是因为,最一开始设计Python解释器的人没有想到多核的情况,后面代码设计越多越多,导致很难去修改了,一直到现在。
    • 有人曾经删除过了GIL锁,但是效果非常差,甚至不如单核;---参考文档
    • Python使用多进程是可以利用多核CPU资源的

7、单线程和多线程

  • 单线程和多线程到底谁更快
    • IO密集型多线程会比单线程快很多,充分省去了IO等待的时间
    • CPU密集型,计算非常多的这种任务,对CPU消耗非常大的,单线程理论上比多线程要快一点,但是计算不大的这种,单线程和多线程消耗的时间其实差不多的

三、列队

  • Python的Queue模块中提供了同步的,线程安全的队列类,这些队列都实现了锁源语,能够在多线程中直接使用,可以使用队列来实现线程间的同步
    • FIFO :(先入先出)队列Queue
    • LIFO :(后入先出)队列LifoQueue
    • 优先级队列:PriorityQueue

1、Queue队列的方法

from queue import Queue

q = Queue(3) # 创建队列 
# 操作方法就可以了

  • Queue.qsize() : 返回当前队列包含的消息数量
  • Queue.empty() : 队列为空返回True,反之False
  • Queue.full() : 队列满了返回True,反之False
  • Queue.get(self,block=True,timeout=None) : 获取队列中的值
    • timeout (block=True) :队列内为空,get值的等待时间,时间内等不到报错,默认一直等
    • block : True:队列为空等待获取值,False:队列为空报错停止执行线程
  • Queue.put(self,block=True,timeout=None) : 写入队列
    • timeout(block=True) :队列满了写入队列值的等待时间,等待时间后还是满的报错,默认一直等待
    • block : True:队列满了等待写入值,False:队列满了报错停止执行线程
  • Queue.get_nowait() : 相当于Queue.get(False)
  • Queue.put_nowait() : 相当于Queue.put(False)
  • Queue.task_done() : 在完成一项工作之后,使用该方法,可以向队列发送一个信号,表示该任务执行完毕
  • Queue.join() 实际上意味着等待队列种多有的任务执行完之后,在往下执行,否者一直等待
    • 任务执行完毕,意味着着收到了Queue.task_done()这个信号
    • 如果用了join方法,没有发送信号会一直等待
    • put了多少个数据,在get使用之后,就的发送多少个Queue.task_done() 使用完毕的信号,不然也会一直等待
    • 如果发送的Queue.task_done()信号,比put的个数多,会报错

2、LifoQueue队列的方法

from queue import LifoQueue

q = LifoQueue(3) # 创建队列 
# 调用操作方法就可以了
  • 继承的Queue,和Queue的方法LifoQueue都有
  • 唯一不同的就是先进入的后出来
  • 内部重写了Queue的几个私有方法实现的先入后出,并没有扩展新方法

3、PriorityQueue队列的方法

from queue import PriorityQueue

q = PriorityQueue(3) # 创建队列 
# 调用操作方法就可以了
# 不同点,生产值的时候,需要传元组
q.put((1,333)
  • 继承的Queue,和Queue的方法PriorityQueue都有
  • 唯一不同的就是生产值的时候,需要传元组
    • 元组第一个元素,是优先级数值
    • get值的时候,优先get出数值最小的值
    • get来出来的也是元组
  • 内部重写了Queue的几个私有方法实现的优先级,并没有扩展新方法

4、生产者消费者模式实现

  • 为什么要使用生产者和消费模式

    • 在线程世界里,生产者是就是生产数据的线程;消费者就是消费数据的线程
    • 如果生产者处理速度很快,而消费者处理速度很慢,那么消费者必须等待消费者处理完,才能继续生产数据,同理返回来消费者消费的快,消费者必须等待生产
    • 为了解决这个问题于是引入了生产和消费者模式
  • 什么是生产者和消费者模式

    • 生产者消费者模式是通过一个容器来解决生产者和消费者的强偶问题
    • 生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯
    • 所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列中去,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
  • 小案例

# 1、用一个队列来存储商品
# 2、创建一个专门生产商品的线程类,当商品数量少于50时,开始生产商品,每次生产200个商品,没生产完一轮 暂停1秒
# 3、创建一个专门消费商品的线程类,当商品数量大于10时就开始消费,,循环消费,每次消费3个。当商品实例少于10的时候,暂停2秒
# 4、创建一个线程生产商品,5个线程消费商品
import queue
import time
import threading


q = queue.Queue(250)


class ProGoods(threading.Thread):
    """生存商品"""

    def run(self):
        while True:
            if q.qsize() < 50:
                for j in range(200):
                    q.put("商品{}".format(str(j)))
                time.sleep(1)


class ConGoods(threading.Thread):
    """消费商品"""

    def run(self):
        while True:
            if q.qsize() > 10:
                for j in range(3):
                    q.get()
                    print(q.qsize())
            else:
                time.sleep(2)


def main():
    # 创建一个线程生产商品
    pro_thread = ProGoods()

    # 5个线程消费商品
    con_thread_list = []
    for i in range(5):
        con_thread = ConGoods()
        con_thread_list.append(con_thread)

    # 开启生存
    pro_thread.start()

    # 开启消费
    for con_thread in con_thread_list:
        con_thread.start()

    # 等待子线程
    pro_thread.join()
    for con_thread in con_thread_list:
        con_thread.join()

    print("最后剩余商品:{}".format(q.qsize()))


if __name__ == '__main__':
    main()

四、进程

1、进程介绍

  • 什么是进程
    • 程序:例如xxx.py这是程序,是一个静态的
    • 进程:一个程序运行起来,代码+用到的资源称之为进程,它是操作系统分配资源的基本单位
    • 不仅可以通过线程来完成多任务,进程也是可以的
  • 进程的状态
    • 工作中,任务数往往大于CPU的核数,即一定有一些任务在等待CPU执行,因此有了不同的状态
    • 就绪状态:运行的条件都已经满足了,正在等待CPU执行
    • 执行状态:CPU正在执行其功能
    • 等待状态:等待某些条件满足,例如一个程序sleep了,此时就处于等待状态

2、进程、线程对比

  • 功能
    • 进程,能够完成多任务,比如在一台电脑上能够同时运行多个软件
    • 线程,能够完成多任务,比如一个QQ中的多个聊天窗口
  • 定义不同
    • 进程是系统进行资源分配和调度的一个独立单位
    • 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位,线程自己基本上不能拥有系统资源,只拥有一点在运行中必不可少的资源,但是它可与同属于一个进程的其他线程共享进程所拥有的全部资源
  • 区别
    • 一个程序至少拥有一个进程,一个进程至少拥有一个线程
    • 线程的划分适度小于进程(资源比进程小),使用多进程成功的并发性高
    • 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大的提高了程序的运行效率
    • 线程不能独立执行,必须依存在进程中
    • 可以将进程理解为一个工厂的流水线,而其中的线程就是这个流水线上的工人
  • 优缺点
    • 线程个进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护,而进程正相反

3、multiprocessing模块

  • Process(group=None, target=None, name=None, args=(), kwargs={})
    • target: 给子进程传递执行的任务代码
    • args:给target指定的函数传递参数,元组方式
    • kwargs:给target指定的函数传递参数,字段方式
    • name:给进程设置一个名字
    • group:指定进程组,大多数情况下用不到
  • Process实例的常用方法
    • start() :启动子进程实例
    • is_alive() :判断子进程是否还在活着
    • jion(timeout) :是否等待子进程执行结束,或者等待多少
    • terminate() :不管任务是否完成,立即终止子进程
  • Process实例对象常用的属性
    • name :当前进程的名字
    • pid :当前进程的pid(进程号)
      • 主进程中可以进程p.pid获取
      • 子进程中用os.getpid()获取

4、进程之间的通信 -multiprocessing.Queue

  • 进程 multiprocessing.Queue 和线程 queue.Queue的区别
    • 方法一摸一样
    • queue.Queue :是进程内多线程之间非阻塞队列
    • multiprocessing.Queue :是跨进程通信队列

1、进程中的Queue的使用

  • 可以使用multiprocessing模块中的Queue实现多进程之间的数据传递,Queue本身是一个消息队列程序,首先用一个实例来演示一下Queue的工作原理:
    • 在父进程中创建两个子进程,
    • 主往Queue中写数据,两个子进程在Queue中读数据
  • 注意进程之间的Queue要当作参数传给任务函数(不共享全局变量)
import os
import time
import requests
from multiprocessing import Process, Queue

a = 0


def work1(q):
    global a
    while True:
        a += 1
        try:
            url = q.get_nowait()
            print("进程{},a ={},url ={}:".format(os.getpid(), a, url))
            requests.get(url)
        except queue.Empty:
            break


def work2(q):
    global a
    while True:
        a += 1
        try:
            url = q.get_nowait()
            print("进程{},a ={},url ={}:".format(os.getpid(), a, url))
            requests.get(url)
        except queue.Empty:
            break


if __name__ == '__main__':

    q = Queue(100)
    for i in range(10):
        q.put("http://127.0.0.1:5000")
    # 将队列对象参数传进任务函数
    p1 = Process(target=work1, args=(q,))
    p2 = Process(target=work2, args=(q,))
    # 开启
    st = time.time()
    p1.start()
    p2.start()
    # print(p1.name)
    # print(p1.pid)
    # 等待
    p1.join()
    p2.join()
    et = time.time()
    print("时间:", et - st)

5、进程池Pool

  • 当需要创建的子进程数量过多的时候,我们可以利用进程池来创建
  • 初始化Pool的时候,可以指定一个最大进程数,当前新的请求提交到Pool中中时,如果池中还没有满,那么就会创建一个新的进程用来执行该请求,但是如果池中的进程数已经达到最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务
  • Pool常用的方法
  • apply_async(func,args=(),kwds={},callback=None,error_callback=None): 使用非阻塞的方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程)
  • close() : 关闭进程池
  • terminate():不管子进程是否结束,立即终止
  • join() :主进程阻塞,等待子进程结束,必须在close或terminate之后使用

6、进程池中的队列

  • multiprocessing模块中Manager来创建队列
    • q = Manager().Queue()

案例

import os
import queue
import time
import requests
from multiprocessing import Pool, Manager

a = 0


def work(q):
    global a
    while True:
        a += 1
        try:
            url = q.get_nowait()
            print("进程{},a ={},url ={}:".format(os.getpid(), a, url))
            requests.get(url)
        except queue.Empty:
            break


if __name__ == '__main__':
    # 进程池队列
    q = Manager().Queue(100)
    for i in range(100):
        q.put("http://127.0.0.1:5000")
    # 进程池
    p = Pool(5)
    # 开启5个进程消费
    st = time.time()
    for i in range(5):
        p.apply_async(func=work, args=(q,))

    # 关闭进程池
    p.close()
    # 等待子进程结束,必须在关闭进程池后使用
    # 如果不等待,主进程结束,子进程也会跟着结束
    p.join()
    et = time.time()
    print("时间:", et - st)

五 、协程

1、利用yield机制来实现多任务

  • 内容回顾
    • 什么是生成器
    • 生成器证明定义
  • 案例
"""
生成器
生成器表达式
在函数中使用yield关键字:生成器
协程是通过生成器实现的多任务,在任务间不停的切换
"""
import time


def work1():
    for i in range(10):
        time.sleep(0.1)
        print("---work1----{}".format(i))
        yield


def work2():
    for i in range(10):
        time.sleep(0.1)
        print("---work2----{}".format(i))
        yield


# 创建两个生成器对象,实现多任务
g1 = work1()
g2 = work2()

while True:
    try:
        next(g1)
        next(g2)
    except StopIteration:
        break
        
# 协程:微线程
"""
协程的本质上是单任务
协程依赖于线程
协程相对与线程来说,占用的资源更少,几乎不要占用什么资源
"""

2、什么是协程

  • 协程是Python中另外一种实现多任务的方式,只不过比线程更小的占用执行单元,为啥说它是一个执行单元?因为它自带CPU上下文,这样只要在合适gr的时机,我们可以把一个协程切换到另一个协程,只要这个过程中保存或恢复CPU上下文那么程序还是可以运行的
  • 通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数一级什么时候在切换到原来的函数都由开发自己确定

3、协程和线程的差异

  • 线程:在实现多任务时,切换从系统层面远不止保存和恢复CPU上下文这么简单,操作系统为了程序运行的高效性,每个线程都有自己缓存Chche等数据,操作系统还会帮你做这些数据的恢复操作,所以线程的切换比较耗性能
  • 协程:协程的切换只是单纯的操作CPU的上下文,所以一秒切换个上百万次都系统都抗的住

4、greenlet模块

为了更好使用协程来完成多任务,Python中的greenlet模块对其封装,从而使得切换任务变得更加简单

  • 安装方式使用
    • 安装: pip install greenlet
    • 使用:创建多任务的greenlet对象
    • 切换:switch()方法传参和任务切换
import greenlet


def work1():
    for i in range(10):
        print("---work1----{}".format(i))
        # 标记切换g2任务
        g2.switch()


def work2():
    for i in range(10):
        print("---work2----{}".format(i))
         # 标记切换g1任务
        g1.switch()
        
# 创建两个greenlet对象
g1 = greenlet.greenlet(work1)
g2 = greenlet.greenlet(work2)

# 切进任务1,有参数可以传参数 *args, **kwargs
g1.switch()

5、gevent模块

  • 安装:
    • pip install gevent
  • gevent.spawn(*args, **kwargs)
    • 创建并开启协程
    • 第一个参数传任务函数
    • 其他参数依次传入即可
  • gevent.sleep()
    • 切换的标志
    • gevent里等待方法
  • gevent().join()
    • 线程等待协程的方法
import gevent

"""
协程:gevent,
对greenlet的再次封装
协程存在于线程之中,线程默认不会等等待协程执行的
"""


def work1(a):
    for i in range(10):
        print("---work{}----{}".format(a, i))
        # 切换的标志
        gevent.sleep(0.001)


def work2(a):
    for i in range(10):
        print("---work{}----{}".format(a, i))
        # 切换的标志
        gevent.sleep(0.001)


# 创建两个gevent对象
# 参数*args, **kwargs
# 默认就已经开启执行了
g1 = gevent.spawn(work1, a=1)
g2 = gevent.spawn(work2, a=2)

# 线程等待协程执行完
g1.join()
g2.join()

  • gevent切换还是要主动用自己的等待标志才会切换,还不够强大
    • gevent有一个补丁可以智能的切换,在IO阻塞的时候自动切换
    • 导入补丁:from gevent import monkey
    • 线程中调用:monkey.patch_all()
    • 此时只要有耗时操作就会自动切换
    • 注意: 一个进程内调用一次monkey.patch_all()方法即可
      • 多进程内每个子进程内调用,不能在主进程中调用
      • 多线程主线程内调用,不能每个子线程都调用
from gevent import monkey

# 放在导包之前,会改系统的环境
# 不然会有警告
monkey.patch_all()
import time
import requests
import gevent


def work1(a):
    for i in range(10):
        res = requests.get("http://www.baidu.com").status_code
        print("---work{}----{}的结果:{}".format(a, i, res))


def work2(a):
    for i in range(10):
        res = requests.get("http://www.baidu.com").status_code
        print("---work{}----{}的结果:{}".format(a, i, res))


# 创建两个gevent对象
# 参数*args, **kwargs
# 默认就已经开启执行了
st = time.time()
g1 = gevent.spawn(work1, a=1)
g2 = gevent.spawn(work2, a=2)

# 线程等待协程执行完
g1.join()
g2.join()
# work1(1)
# work2(2)
et = time.time()
print("时间:", et - st)

# 两个协程:时间: 0.4506347179412842
# 单线程 : 时间: 0.7228543758392334
  • 协程的队列
    • 线程、进程、进程池的队列都可以共享使用

六、总结

1、综合练习

开启多个进程,每个进程开启多线程,每个线程开启多个协程消费

import warnings
import time
import requests
import threading
import gevent
from gevent import monkey
from multiprocessing import Pool, Manager

warnings.filterwarnings("ignore")


# 10000个请求,开启2个进程,每个进程中实现3个线程,每个线程中实现5个协程去处理 计算时间

def consume_request(q):
    """
    取队列的url请求
    """
    a = 0
    while q.qsize() > 0:
        url = q.get()
        requests.get(url)
        a += 1
    print("协程完成消费次数:",a)


def gevents(q):
    """
    每个线程创建5个协程
    """
    gev_list = []
    for i in range(5):
        gev = gevent.spawn(consume_request, q)
        gev_list.append(gev)
    for gev in gev_list:
        gev.join()


def threads(pro_q):
    """
    创建三个线程
    """
    # 每个子进程调用一次协程补丁
    monkey.patch_all()

    th_list = []
    for i in range(3):
        th = threading.Thread(target=gevents, args=(pro_q,))
        th_list.append(th)
    for th in th_list:
        th.start()
    for th in th_list:
        th.join()


def create_process():
    """创建两个进程"""
    pro_q = Manager().Queue()
    for i in range(1000):
        pro_q.put("http://127.0.0.1:5000")
    s_time = time.time()
    pool = Pool(2)
    for i in range(2):
        pool.apply_async(func=threads, args=(pro_q,))
    pool.close()
    pool.join()
    e_time = time.time()
    print("耗时:", e_time - s_time)


if __name__ == '__main__':
    create_process()

2、简单总结

  • 进程是资源分配的资源
  • 线程是操作系统调度的单位
  • 进程切换需要的资源最大,效率最低
  • 线程切换需要的资源一般,效率一般(当然是了在不考虑GIL锁的情况下)
  • 协程切换任务资源很小,效率高
  • 对进程、多线程根据CPU核数不一样可能是并行,但是协程是在一个线程中,所以是并发
  • Python中的线程由于GIL锁的存在,并不能实现并行

推荐阅读