首页 > 技术文章 > 第九章并发编程

xu1296634150 2020-04-14 21:11 原文

9.1 操作系统基础知识

9.1.1 操作系统的发展史

操作系统的发展史

  • 第一代出现:人机矛盾

    • cpu利用率低

  • 批处理+磁带存储

    • 降低数据的读取时间

    • 提高cpu的利用率

  • 多道操作系统(重要)在一个任务遇到io的时候主动让出cpu

    • 数据隔离的概念

    • 时空复用

    • 能够在一个任务遇到io操作的时候主动把cpu让出来,给其他的任务使用

      • 切换要不要占时间:占用时间

      • 切换是操作系统做的

  • 分时操作系统 ---给时间分片,让多个任务轮流使用cpu

    • 短作业优先算法

    • 先来先服务算法

    • 时间分片

      • CPU的轮转

      • 每一个程序分配一个时间片

    • 要切换,要占用时间

    • 降低了cpu的利用率

    • 提高了用户体验

  • 分时操作系统+多道操作系统+实时操作系统

    • 多个程序一起在计算机中执行

    • 一个程序如果遇到IO操作,切出去让出cpu

    • 一个程序没有遇到io,但是时间片到时了,切出去让出cpu

    • celery是python中的分布式框架

  • 网络操作系统

  • 分布式操作系统

9.1.2操作系统的作用

所有的软件安装在操作系统上,操作系统调用

#一:隐藏了丑陋的硬件调用接口,为应用程序员提供调用硬件资源的更好,更简单,更清晰的模型(系统调用接口)。应用程序员有了这些接口后,就不用再考虑操作硬件的细节,专心开发自己的应用程序即可。
例如:操作系统提供了文件这个抽象概念,对文件的操作就是对磁盘的操作,有了文件我们无需再去考虑关于磁盘的读写控制(比如控制磁盘转动,移动磁头读写数据等细节),

#二:将应用程序对硬件资源的竞态请求变得有序化
例如:很多应用软件其实是共享一套计算机硬件,比方说有可能有三个应用程序同时需要申请打印机来输出内容,那么a程序竞争到了打印机资源就打印,然后可能是b竞争到打印机资源,也可能是c,这就导致了无序,打印机可能打印一段a的内容然后又去打印c...,操作系统的一个功能就是将这种无序变得有序。

9.1.3 操作系统的概念

操作系统就是一个协调、管理和控制计算机硬资源和软件资源的控制程序。

ps:操作系统位于计算机硬件和应用软件之间,本质也是一个软件。操作系统由操作系统的内核(运行于内核态,管理硬件资源)以及系统调用(运行于用户态,为应用程序写的应用程序提供系统调用接口)两部分组成。

9.2 进程理论

9.2.1 进程的概念

  • 进程的概念:正在进行的一个过程或者说一个任务。而负责执行任务的是cpu。

  • 程序和进程之间的区别

    • 程序只是一个文件(或者说是一堆代码)

    • 进程是这个文件被cpu运行起来了(进程是程序的运行过程)

  • 进程是计算机中最小的资源分配单位(*****)

  • 进程的三状态图:

    • --->就绪---操作系统调用---><--时间片到了----运行--->运行 遇到io--->阻塞--io结束-->回到就绪

  • 在操作系统中的唯一标识符:pid,pid是个随机值不固定

  • 操作系统调度进程的算法的原则

    • 短作业优先

    • 先来先服务

    • 时间片轮转

    • 多级反馈算法

9.2.2 并发与并行

  • 并发和并行的区别

    • 并行:两个程序,两个cpu,每个程序分别占用一个cpu自己执行自己的,看起来是同时执行,实际在每一个实际点上都在各自执行着

    • 并发:两个程序或多个程序,一个cpu,每个程序交替的在一个cpu上执行,看起来是在同时自行,但是实际上仍然是串行。并发是个伪并行,即看起来是同时运行的,单个cpu+多道技术就可以实现并发

  • 同步和异步

    • 同步:停下某个动作,做其他的动作,有依赖和等待(socketserver)

    • 异步:两个动作可以同时进行没有依赖和等待

  • 阻塞和非阻塞

    • 阻塞:cpu不工作

    • 非阻塞:cpu工作

  • 同步阻塞:调用conn.recv方法,socket阻塞的tcp协议的时候

  • 同步非阻塞:自定义的func(),没有io操作

    • socket 非阻塞的tcp协议的时候

    • 调用函数(这个函数内部不存在io操作)

  • 异步非阻塞*****

    • 没有io操作,把func()扔到其他任务里去执行了,我本身的任务和func任务各自执行各自的,cpu在使用

  • 异步阻塞:有io操作,把func()扔到其他任务里去执行了,我本身的任务和func任务各自执行各自的,cpu在使用

  • 进程的创建:双击程序,

  • 进程的结束

    • 正常退出/出错退出/严重错误/被其他进程杀死

  • 进程的特点:

    • 数据隔离的

    • 创建进程 时间开销大

    • 销毁进程 时间开销大

    • 进程之间切换 时间开销大

    • 如果两个程序 分别要做两件事

      • 起两个进程

9.2.3 线程

如果是一个程序,要分别做两件事

  • 视频软件

    • 下载a电影

    • 下载b电影

    • 播放c电影

  • 启动三个进程来完成上面的三件事情,但是开销大

  • 在进程中可以有多个线程,线程可以同时进行

  • 线程的概念:是进程中的一部分,不能脱离进程存在,每一个进程中至少有一个线程,线程是负责执行具体的代码的,进程是负责圈资源的

    • 进程是计算机中最小的资源分配单位(进程负责圈资源)

    • 线程是计算机中能被CPU调度的最小单位(线程是负责执行具体的代码的)

  • 开销:

    • 线程的创建,也需要一些开销(一个存储局部变量的结构,记录状态)

      • 创建、销毁、切换线程开销远远小于进程

注意:python中的线程比较特殊,所以进程也有可能被用到

  • 进程特点:数据隔离 开销大 同时执行几段代码

  • 线程特点: 数据共享 开销小 同时执行几段代码

9.3开启进程的方法

9.3.1multiprocessing模块

multiprocessing模块来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threaing的编程接口类似。multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

需要强调的是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

  • Process类

  • import os
    import time
    print('start')
    time.sleep(10)
    print(os.getpid(),os.getppid(),'end')
    
    pid ---process id  子进程
    ppid -- parent process 父进程

    子进程:在pycharm中启动的所以py程序都是pycharm的

    父进程:在父进程中创建了子进程

  • 应用
import os
import time
from multiprocessing import Process  #大写
def func():
    print('start',getpid())
    time.sleep(10)
    print('end',os.getpid())
 if __name__ =='__main__':
    p = Process(target=func) #func函数交给子进程执行
    p.start()  #异步的程序,开启进程的方法,但是并不等待这个进程真的开启
    print('main:',os.getpid()) #自己不需要执行func
    #输出 main: #父进程的pid
         start
         end 

例题

import os
import time
from multiprocessing import Process  #大写
def func():
    print('start',getpid())
    time.sleep(10)
    print('end',os.getpid())
 if __name__ =='__main__':
    p = Process(target=func) 
    p.start()
  print('main:',os.getpid()) 
    #输出 main:一个是父进程的pid
          main:  一个是子进程的pid
          start
          end 
import os
import time
from multiprocessing import Process  #大写
def eat():
    print('start eating',getpid())
    time.sleep(10)
    print('end eating',os.getpid())
 def sleep():
    print('eat sleeping',getpid())
    time.sleep(10)
    print('end sleeping',os.getpid()) 
 if __name__ =='__main__':
    p1= Process(target=eat) #创建了一个即将要执行eat函数的进程对象
    p1.start()  #开启一个进程
    p2 = Process(target=sleep) #创建了一个即将要执行sleep函数的进程对象
    p2.start()  #开启一个进程
    print('main:',os.getpid()) #自己不需要执行func
    
    #输出:main :1493
        start eating 13572
        start sleeping 14268
        end eating 1375
        end sleeping 14298
  • 主进程和子进程之间的关系:

    • 主进程会等待子进程的结束,主进程负责回收子进程的资源,如果子进程执行结束,父进程没有回收资源,那么子进程就会变成僵尸进程

  • 主进程的结束逻辑:

    • 主进程的代码结束

    • 所有的子进程结束

    • 给子进程回收资源

    • 主进程结束

    • import os
      import time
      from multiprocessing import Process  #大写
      def func():
          print('start',getpid())
          time.sleep(10)
          print('end',os.getpid())
       if __name__ =='__main__':
          p = Process(target=func) 
          p.start()
          print('main:',os.getpid())  #主进程没有结束等待子进程结束,主进程负责回收子进程的资源,如果子进程执行结束,父进程没有回收资源,那么子进程就会变成僵尸进程
          #输出 main:一个是父进程的pid
               start
               end 

       

  • 主进程怎么知道子进程结束了呢?

    • 基于网络、文件

    • join 方法:阻塞,直到子进程结束就结束

      • 把一个进程的结束事件封装成了一个join方法

      • 执行join方法的效果就是阻塞直到这个子进程执行结束就结束阻塞

      • 在多个子进程中使用join

    • import time
      import random
      from multiprocessing import Process
      def send_mali(i):
          time.sleep(random.random()) #每一封邮件的发送时间不确定,不确定哪个进程先
          print('发送了一封邮件',i)
      
      if __name__ =='__main__':
          l = []
          for i in range(10):
              p = Process(target=send_mail,args = (i,))
              p.start() #异步非阻塞
              l.append(p)#把p存起来
          print(l)
          for p in l:
              p.join()  #同步阻塞,知道p对应的进程结束之后才结束阻塞
           print('500封邮件已发送完毕')
       #输出:  发送了一封邮件
       #       500封邮件已发送完毕
      import mutilprocessing import Process
      def send_mail():
          print('发送了一封邮件')
      if __name__ == '__main__':
          l = []
          for i in range(10):    
              p = Process(send_mail)
              p.start()
              l.append(p)
           for p in l:p.join()
           print('发送了500封邮件')

9.3.2补充 if_ name _ == '_ _main _ _'

控制当这个py文件被当做脚本的时候的直接执行里面的代码,当这个py文件被当做模块导入的时候,就不执行这里面的代码,而只打印文件的名称

  • _ name _ = ='_ _main _ _'

    • 执行的文件就是'_ _ name _ _'所在的文件

    • _ name _ = ='文件名'

      • _ _name _ _所在的文件被导入执行的时候,打印出来的会是文件名

注意:在windows系统中开启进程这个动作必须放在 if_ name _ == '_ _main _ _'

  • 操作系统创建进程的方式不同

    • Windows操作系统执行开启进程的代码

      • 实际上新的子进程需要通过import父进程的代码来完成数据单额导入工作

      • 所以有一些内容我们只希望在父进程中完成,就写在 if_ name _ == '_ _main _ _' :下面

    • ios linux操作系统创建进程 fork

9.3.3 Process模块

  • 守护进程

    • 定义:有一个参数可以把一个子进程设置为一个守护进程

    • 子进程是随着主进程的代码结束而结束的

    

import time
from mutilprocessing import Process
def func():
    while True:
        print('is alive')
        time.sleep(0.5)

if __name__ == '__main__':
    p = Process(target = func)
    p.daemon = True  #把p子进程设置成了一个守护进程
    p.start()
    time.sleep(2)
  # 输出‘ is alive
          is alive
           is alive
          is alive’
            
            
   # 守护进程是随着主进程的代码结束而结束的
#所有的子进程都必须在主进程结束之前结束,由主进程回收资源
import time
from mutilprocessing import Process
def func():
    while True:
        print('is alive')
        time.sleep(0.5)
 def func2():
    for i in range(5):
        print('fuc2')
        time.sleep(1)

if __name__ == '__main__':
    p = Process(target = func)
    p.daemon = True  #把p子进程设置成了一个守护进程
    p.start()
    p2 = Process(target = func2)
    p2.start()
    time.sleep(2)
    
    输出:is alice
          func2
          is alice
           is alice
           func2
            is alice  #2秒
            func2
           func2
              func2 #5秒
      
  • 守护进程的应用:

    • 生产者消费者模型的时候

    • 和守护线程做对比的时候

  • Process类对象的其他方法

  • isalive() 查看进程是否还活着

  • import time
    from mutilprocessing import Process
    def func():
        while True:
            print('is alive')
            time.sleep(0.5)
    
    if __name__ == '__main__':
        p = Process(target = func)
        p.daemon = True  #把p子进程设置成了一个守护进程
        p.start()
        print(p.isalive())

     

  • terminate() 强制执行结束子进程
import time
from mutilprocessing import Process
def func():
    while True:
        print('is alive')
        time.sleep(0.5)

if __name__ == '__main__':
    p = Process(target = func)
    p.daemon = True  #把p子进程设置成了一个守护进程
    p.start()
    print(p.is_alive())
    p.teminate()  #强制结束一个子进程,异步的
    print(p.is_alive()) #还会打印是因为进程还活着,因为操作系统还没来得及关闭进程
    time.sleep(0.1)
    print(p.is_alive())#操作系统已经响应了要关闭系统的需求,再去检测的时候,得到的结果是进程已经结束了
  • 什么是异步非阻塞

    • terminate 异步非阻塞,在整个发起需求的过程中不等待结果,不等待结果结束,直接执行其他的代码

  • 使用面向对象的方式开启一个子进程

import os
from mutilprocessing import Process
class MyProcess1(Process):
    def run(self):
        print(os.getpid(),os.getppid())
 class MyProcess2(Process):
    def run(self):
        for i in range(5):
             print('ins2')
             time.sleep(1)
           
       
if __name__ == '__main__':
    mp = MyProcess1()
    mp.start()
    mp = MyProcess2()
    mp.start()
    print('main:',os.getpid())
import os
from mutilprocessing import Process
class MyProcess1(Process):
    def __init__(self,x,y):  #需要传参就要写init还要带上父类的init
        self.x = x
        self.y = y
        super().__init__()
    def run(self): #子进程中的事情都要写在run函数里面
        print(os.getpid(),os.getppid())
 class MyProcess2(Process):
    def run(self):
        for i in range(5):
             print('ins2')
             time.sleep(1)
           
       
if __name__ == '__main__':
    mp = MyProcess1()
    mp.start()
    mp = MyProcess2()
    mp.start()
    print('main:',os.getpid())

并发编程能够做的事:

  • 实现能够响应多个client端的server

  • 抢票系统

import time
import json
from mutilprocessing import Process,Lock
def search_ticket(): #查看票
    with open('ticket_count') as f:
        dic = json.load(f)
        print('%s查询结果 :%s张余票'%(user,dic['count'])
def buy_ticket(user,lock):
      with lock: #推荐用,因为一个进程有问题不影响其他进程的进行
      #lock.acquire() #给这段代码加上锁
      time.sleep(0.02)
      with open('ticket_count') as f:
         dic = json.load(f)   
       if dic['count'] >0:
              print('%s买到票了'%(user)
              dic['count'] -=1
        else:
              print('%s没有余票了'%(user))
       time.sleep(0.02)
       with open('ticket_count','w')as f:     
             json.dump(dic,f)
      # lock.release()  #给这段代码解锁 
if __name__ =='__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target = buy_ticket,args = ('user%s'%i,lock))
        p.start()
    

9.4锁

9.4.1 锁的意义

  • 如果在一个并发的场景下,涉及到某部分内容是需要修改一些所有进程的共享数据资源需要加锁来维护数据的安全

  • 在数据安全的基础上,才考虑效率问题

  • 同步存在的意义

    • 数据的安全性

  • 在主进程中实例化 lock = Lock()

  • 把这把锁传递 给子进程

  • 在子进程中对需要加锁的代码 进行with lock:

    • with lock相当于lock.acquire()和lock.release()

  • 在进程中需要加锁的场景

    • 操作共享的数据资源

    • 对资源进行修改、删除操作

  • 加锁之后能够保证数据的安全性,但是也降低了程序的效率

import time
import json
from mutilprocessing import Process,Lock
def search_ticket(): #查看票
    with open('ticket_count') as f:
        dic = json.load(f)
        print('%s查询结果 :%s张余票'%(user,dic['count'])
def buy_ticket(user,lock):
      time.sleep(0.02)
      with open('ticket_count') as f:
         dic = json.load(f)   
       if dic['count'] >0:
              print('%s买到票了'%(user)
              dic['count'] -=1
        else:
              print('%s没有余票了'%(user))
       time.sleep(0.02)
       with open('ticket_count','w')as f:     
             json.dump(dic,f)
def task(user,lock):
    search_tick(user)
     with lock:
          buy_ticket(user,lock)
if __name__ =='__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target = task,args = ('user%s'%i,lock))

9.4.2 线程中的锁

  • 线程中是否会产生数据不安全?---是,在代码执行过程中移交gil锁,导致了结果出错

  • 即便是线程,即使有gil锁,在操作全局变量也会出现数据不安全的问题

  • +=/-=/*=/+ 先计算在赋值的时候,还有list[0] =+1,dic['key'] -=1,才容易出现数据不安全的问题(import diss--查看代码执行过程)

  • 加锁:会影响程序的执行效率,但是保证了数据的安全

a = 0
def add_f(lock):
    global a
    for i in range(200000):
        with lock:
            a += 1

def sub_f(lock):
    global a
    for i in range(200000):
        with lock:
            a -= 1

from threading import Thread,Lock
lock = Lock()
t1 = Thread(target=add_f,args=(lock,))
t1.start()
t2 = Thread(target=sub_f,args=(lock,))
t2.start()
t1.join()
t2.join()
print(a)

9.4.3 互斥锁

互斥锁:是锁中的一种,在同一个线程中,不能连续acquire多次

from threading import Lock
lock = Lock()
lock.acquire()
print('*'*20)  #只打印第一个print
lock.acquire()
print('-'*20)
#在同一个线程中,不能连续acquire多次

from threading import Lock
lock = Lock()
lock.acquire()
print('*'*20)  
lock.release()
lock.acquire()
print('-'*20)
lock.release()
acquire和release配套使用

9.4.4 单例模式下的锁

不管在哪个情况下都必须加锁

import time
from threading import Lock
class A:
    __instance = None
    lock = Lock()  #
    def __new__(cls, *args, **kwargs):
        with cls.lock:  #
            if not cls.__instance:
                time.sleep(0.1)
                cls.__instance = super().__new__(cls)
        return cls.__instance
    def __init__(self,name,age):
        self.name = name
        self.age = age

def func():
    a = A('alex', 84)
    print(a)

from threading import Thread
for i in range(10):
    t = Thread(target=func)
    t.start()

9.4.5死锁现象

  • 死锁现象:是指两个或两个以上的进程或线程在执行过程中, 因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程

import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name,noodle_lock,fork_lock):
    noodle_lock.acquire()
    print('%s抢到面了'%name)
    fork_lock.acquire()
    print('%s抢到叉子了' % name)
    print('%s吃了一口面'%name)
    time.sleep(0.1) #在停留0.1秒的时候,wusir线程和太白线程已经开启了,所以会出现死锁现象
    fork_lock.release()
    print('%s放下叉子了' % name)
    noodle_lock.release()
    print('%s放下面了' % name)

def eat2(name,noodle_lock,fork_lock):
    fork_lock.acquire()
    print('%s抢到叉子了' % name)
    noodle_lock.acquire()
    print('%s抢到面了'%name)
    print('%s吃了一口面'%name)
    time.sleep(0.1)
    noodle_lock.release()
    print('%s放下面了' % name)
    fork_lock.release()
    print('%s放下叉子了' % name)

lst = ['alex','wusir','taibai','yuan']
Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()
  • 死锁是怎么发生的?

    • 有多把锁,一把以上

    • 多把锁交替使用

  • 怎么解决死锁

    • 递归锁--将多把互斥锁变成了一把递归锁

      • 快速解决问题

      • 效率差

      • 递归锁也会发生死锁现象,多把锁交替使用的时候

    • 优化代码逻辑

      • 可以使用互斥锁解决问题

      • 效率相对好

      • 解决问题的效率相对低

 

import time
from threading import RLock,Thread
# noodle_lock = RLock()
# fork_lock = RLock()
noodle_lock = fork_lock = RLock()
print(noodle_lock,fork_lock)
def eat1(name,noodle_lock,fork_lock):
    noodle_lock.acquire()
    print('%s抢到面了'%name)
    fork_lock.acquire()
    print('%s抢到叉子了' % name)
    print('%s吃了一口面'%name)
    time.sleep(0.1)
    fork_lock.release()
    print('%s放下叉子了' % name)
    noodle_lock.release()
    print('%s放下面了' % name)

def eat2(name,noodle_lock,fork_lock):
    fork_lock.acquire()
    print('%s抢到叉子了' % name)
    noodle_lock.acquire()
    print('%s抢到面了'%name)
    print('%s吃了一口面'%name)
    time.sleep(0.1)
    noodle_lock.release()
    print('%s放下面了' % name)
    fork_lock.release()
    print('%s放下叉子了' % name)

lst = ['alex','wusir','taibai','yuan']
Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()  

 

9.4.6 递归锁

  • 在同一个线程中,可以连续acquire多次不会锁住

  • 递归锁解决死锁的原因是把多把互斥锁变成一把递归锁了

  • 递归锁如果创建了两把锁也是会产生死锁现象

  • 递归锁

    • 优点:在同一个进程中多次acquire也不会发生阻塞

    • 缺点:占用了更多资源

import time
from threading import RLock,Thread
# noodle_lock = RLock()
# fork_lock = RLock()
noodle_lock = fork_lock = RLock()
print(noodle_lock,fork_lock)
def eat1(name,noodle_lock,fork_lock):
    noodle_lock.acquire()
    print('%s抢到面了'%name)
    fork_lock.acquire()
    print('%s抢到叉子了' % name)
    print('%s吃了一口面'%name)
    time.sleep(0.1)
    fork_lock.release()
    print('%s放下叉子了' % name)
    noodle_lock.release()
    print('%s放下面了' % name)

def eat2(name,noodle_lock,fork_lock):
    fork_lock.acquire()
    print('%s抢到叉子了' % name)
    noodle_lock.acquire()
    print('%s抢到面了'%name)
    print('%s吃了一口面'%name)
    time.sleep(0.1)
    noodle_lock.release()
    print('%s放下面了' % name)
    fork_lock.release()
    print('%s放下叉子了' % name)

lst = ['alex','wusir','taibai','yuan']
Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=

9.4.7锁总结

  • 互斥锁:

    • 在一个线程中连续多次acquire会死锁

  • 递归锁

    • 在一个线程中连续多次acquire不会死锁

  • 死锁现象

    • 是怎么发生的?

      • 有多把锁,一把以上

      • 多把锁交替使用

    • 怎么解决死锁

      • 递归锁--将多把互斥锁变成了一把递归锁

        • 快速解决问题

        • 效率差

      • 递归锁也会发生死锁现象,多把锁交替使用的时候

      • 优化代码逻辑

        • 可以使用互斥锁解决问题

        • 效率相对好

        • 解决问题的效率相对低

  • 进程和线程都有锁

    • 所有在线程中能工作的基本都不能在进程中工作,因为进程中工作方式比线程中的工作方式更复杂

    • 在进程中能够使用的基本在线程中也可以使用

9.4.8进程之间的通讯

  • 进程之间的数据隔离

    • 父进程和子进程之间的数据隔离,子进程不能改变父进程

 

from mutilprocessing import Process
n = 100
def func():
    global n
    n -=1
if __name__ == '__main__':
    l = []
    for i in range(10):
        p = Process(target = func)
        p.start()
        l.append(p)
     for p in l: p.join()
     print(n)  #100

通讯

  • 进程之间的通讯 -IPC(inter process communication)

  • 队列 Queue

 

from mutilprocessing import Process,Queue
def func(exp,q):
    ret = eval(exp)
    q.put(ret)
 if __name__ == '__main__':
    q = Queue()
    Process(target = func,args = ('1+2+3',)).start()
    print(q.get())
  • Queue 天生就是数据安全的

    • 基于文件家族的socket实现的,写文件是基于pickle,还有lock锁

    • 管道+锁

  • pipe 管道 天生没有数据安全,基于socket 和pickle

  • from mutilprocessing import Process Pipe
    Pip = Pipe()
    pip.send()
    pip.recv()
  • 队列和管道都是ipc机制,队列进程之间数据安全,管道进程之间数据不安全

  • 单进程的程序

    • 在队列为空的时候会发生阻塞,当队列为满的时候在向队列中放数据,队列会阻塞

  • from mutilprocessing import Process Queue
    q = Queue(5)
    q.put(1)  #put放数据
    q.put(2)
    q.put(1)
    q.put(2)
    q.put(1)
    print('55555')
    q.put(6) #当队列为满的时候在向队列中放数据,队列会阻塞
    print('6666')
    
    
    print(q.get())   #get取数据
    print(q.get())  
    print(q.get())  #在队列为空的时候会发生阻塞
    import queue
    from mutilprocessing import Process Queue
    q = Queue(5)
    q.put(1)  #put放数据
    q.put(2)
    q.put(1)
    q.put(2)
    q.put(1)
    print('55555')
    try:
        q.put_nowait(6) #当队列为满的时候再向队列中放数据,会报错且丢失数据
    except queue.Full:
        pass
    print('6666')
    
    
    print(q.get())   #get取数据,是阻塞
    print(q.get())  
    print(q.get())  #在队列为空的时候会发生阻塞
    
    print(q.get())   #get取数据
    print(q.get())
    tryprint(q.get_nowait())  #在队列为空时,直接报错,get_nowait非阻塞
    except queue.Empty:pass

     

 其他方法

 

queue.empty() 判断队列是否为空
q.qsize()返回队列中的数据个数
q.full()判断队列是否为满
  • ipc是进程之间的通信

  • 常见的ipc机制:

    • 内置的ipc机制(队列 、管道),

    • 第三方工具提供给我们的ipc机制

      • redis

      • memcache

      • kafka

      • rabbitmq

      • 第三方ipc机制可以满足并发需求、高可用、断电保存数据、解耦

9.4.9 生产者消费者模型

概念:是指通过一个容器来平衡生产者和消费者之间的强耦合问题,生成长和消费者不直接进行通讯,而是通过阻塞队列来进行通讯,生产者生产完数据不等待而是直接扔到队列中,消费者也直接从阻塞队列取值,阻塞队列就是一个缓冲区,平衡了生产者和消费者之间的处理问题

  • 解耦(修改,复用,可读性高)

  • 把写在一起的大的功能分开成多个小的功能处理

  • 生产者:生产数据

  • 消费者:处理数据

  • 与进程的关系:

    • 一个进程就是一个生产者

    • 一个进程就是一个消费者

  • 与队列的关系:

    • 生产者和消费者之间的容器就是队列,队列有大小可以设置大小

    • import time
      import random
      from multiprocessing import Process,Queue
      
      def producer(q,name,food):
          for i in range(10):
              time.sleep(random.random())
              fd = '%s%s'%(food,i)
              quit(fd)
              print('%s生产了一个%s'%(name,food))
      
      
      
      def consumer(q,name):
          while True:
              food = q.get()
              time.sleep(random.randint(1,3))
              print('%s吃了%s'%(name,food))
      
      
      
      if __name__ == '__main__':
         q = Queue(10) #大小限制
         p1 = Process(target=producer,args=(q,'wusir','泔水'))
         p1.start()
         c1 = Process(target=consumer,args=(q,'alex'))
         c1.start()
         c2 = Process(target=consumer, args=(q, 'alex2'))
         c2.start()

       

    • joinablequeue

      • 我们使用共享队列,创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

      • """JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:
        
        q.task_done() 
        使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。
        
        q.join() 
        生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。 
        下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。"""
  • 例题
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
        q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
    q.join() #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。


if __name__ == '__main__':
    q=JoinableQueue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨头',q))
    p3=Process(target=producer,args=('泔水',q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True
    c2.daemon=True

    #开始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('') 
    
    #主进程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    #因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。

9.4.10 进程之间的数据共享

  • multiprocessing 中有一个Manager类

    • 封装了所有和进程相关的, 可以数据共享、数据传递

    • 对于字典、列表这一类的数据操作的时候会产生数据不安全,需要加锁解决问题,并且需要尽可能少的使用这种方式

以创建共享的list\dict,修改数据不安全,可以加锁

from multiprocessing import Manager,Lock
def func(dic,lock):
    with lock:
        dic['count'] -=1
if __name__ == '__main__':
    #m = Manager()
    with Managet() as f:
        lock = Lock()
        dic = m.dict({'count':100})
        p_l = []
        for i in range(100):
            p = Process(target = func,args= (dic,))
            p.start()
            p_l.append(p)
         for p in p_l:p.join()
         print(dic)
        # mulprocessing中有一个manager类
# 封装了所有和进程相关的 数据共享 数据传递
# 相关的数据类型
# 但是对于 字典 列表这一类的数据操作的时候会产生数据不安全
# 需要加锁解决问题,并且需要尽量少的使用这种方式
    

9.5 线程

9.5.1 初识线程

  • python解释器中的线程问题 (** ** *)

  • 线程:开销小 数据共享 是进程的一部分

  • 进程:开销大 数据隔离 是一个资源分配单位

  • cpython解释器不能实现多线程利用多核

    • 锁:gil 全局解释器锁

      • 保证了整个python程序中,只能有一个线程被cpu知晓

      • 原因:cpython解释器中特殊的垃圾回收机制

      • gil锁导致了线程不能并行,只能并发

      • 所以使用多线程并不影响高io型的操作,只会对高计算型的程序有效率上的影响

      • 遇到高计算:多进程 +多线程

        • 分布式

  • 遇到io操作的时候

    • 5亿条cup指令/s

    • 5-6cup指令 == 一句python代码

    • 几千万条python代码

  • web框架 几乎都是多线程

  • gil锁导致了线程不能并行,只能并发

9.5.2 用python操作线程

  • threading类

  • multiprocessing是完全仿照threading的类写的

  • 启动子线程 start

import os
from threading import Thread
def func():
    print(os.getpid())
Thread(target = func).start() #开启一个线程的速度非常快
print('--->',os.getpid())
#12828
 ---->12828
  • 开启多个子线程
import time
import os
from threading import Thread
def func(i):
    print('start son thread',i)
    time.sleep(1)
    print('end son thread',i,os.getpid())
for i in range(10):
    Thread(target=func,args=(i,)).start()
  • 主线程什么时候结束?

    • 要等到所有子线程结束之后才结束,主线程如果结束了,主进程也就结束了

  • join方法 阻塞 知道子线程执行结束

import os
import time
from threading import Thread
def func(i):
    print('start son thread',i)
    time.sleep(1)
    print('end son thread',i,os.getpid())
       
t_l = []
for i in range(10):
    t = Thread(target = func,args=(0,))
    t.start()
    t_l.append(t)
for t in t_l:t.join()
print('子线程执行完毕')
  • 使用面向对象的方法启动线程
class MyThread(Thread):
    def __init(self,i):
        self.i = i
        super()__init__()
    def run(self):
        print('start',self.i)
        time.sleep(1)
        print('end')
for i in range(10):
    MyThread(i).start()
  • 线程id:iden
class MyThread(Thread):
     def __init__(self,i):
        self.i = i
        super()__init__()#执行父类的init
    def run(self):
        print('start',self.i)
        time.sleep(1)
        print('end')
for i in range(10):
    t= MyThread(i)
    t.start()
    print(t.ident)

线程里的一些其他方法

  • 在函数内想打印ident,需要用current_thread

  • 在线程里面使用current_thread,就是查看当前线程所在的对象

from threading import current_thread,enumerate,active_count
def func(i):
    t = current_thread
    print('start son thread',i,t.ident)
    time.sleep(1)
    print('end son thread',i,os.getpid())
t = Thread(target = func,args=(1,))
t.start()
print(t.ident)
print(enumerate())#打印主线程和子线程的id
print(active_count) ==len(enumerate())
  • terminate 结束进程 :在线程中不能从主线程结束一个子线程

    • 子线程只能等待自己的代码执行完毕结束

  • 测试

    • 进程和线程的效率差

      • 进程效率比线程差很多

def func(a,b):
    c = a + b
import time
from multiprocessing import Process
from threading import Thread
if __name__ == '__main__':
    start = time.time(    )
    p_l =[]
    for i in range(10):
        p = Process(target = func,args=(i,i*2))
        p.start()
        p_l.append(p)
     for p in p_l:p.join()
     print('process',time.time()-start)
    
    
def func(a,b):
    c = a + b
import time
from multiprocessing import Process
from threading import Thread
if __name__ == '__main__':
    start = time.time(    )
    p_l =[]
    for i in range(10):
        p = Thread(target = func,args=(i,i*2))
        p.start()
        p_l.append(p)
     for p in p_l:p.join()
     print('thread',time.time()-start)
  • 线程数据隔离还是共享?共享的
from threading import Thread
n = 100
def func():
    global n  #不要在子线程里随便修改全局变量
    n -= 1
 t_l = []
 for i in range(100):
    t = Thread(target=func)
    t_l.append(t)
    t.start()
for t in t_l:t.join()
print(n)  #0

守护线程:守护主线程的结束

  • 守护线程一直等待所有的非守护线程都结束之后才结束的

  • 除了守护主线程的代码之外也会守护子线程

import time
from threading import Thread
def son1():
    whlie True:
        time.sleep(0.5)
        print('in son1')
 def son2():
    for i in range(5):
        time.sleep(1)
        print('in son2')
t = Thread(target = son1)
t.daemon = True
t.start()
Thread(target = son2).start()
time.sleep(3)
#守护线程一直等待所有的非守护线程都结束之后才结束的

9.5.3队列 queue

  • import queue

    • 进程之间的通信 线程安全

from queue import Queue  # 先进先出队列
# q = Queue(5) #队列大小
# q.put(0)
# q.put(1)
# q.put(2)
# q.put(3)
# q.put(4)
# print('444444')
#
#
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())
from queue import LifoQueue  # 后进先出队列
# last in first out 栈
# lfq = LifoQueue(4)
# lfq.put(1)
# lfq.put(3)
# lfq.put(2)
# print(lfq.get()) #2
# print(lfq.get())#3
# print(lfq.get())#1
  • 先进先出

    • 写一个server,所有的用户的请求放在队列里

    • 先来先服务的思想

  • 后进先出

    • 算法

  • 优先级队列

    • 自动的排序

    • 抢票的用户级别 100000 100001

    • 告警级别

# from queue import PriorityQueue
# pq = PriorityQueue()
# pq.put((10,'alex'))
# pq.put((6,'wusir'))
# pq.put((20,'yuan'))
# print(pq.get())  #6,'wusir'
# print(pq.get())  #10,'alex'
# print(pq.get())  #20,'yuan'
按照元组的第一个数字来打印的,先小后大

 

9.6 池

为什么要有池?

  • 进程、线程的开启、关闭、切换都需要时间

  • 池的概念:预先的开启固定个数的进程数,当任务来临的时候,直接提交给已经开好的进程,让这个进程去执行就可以了,节省了进程进程、线程的开启、关闭、切换的时间,并且减轻了操作系统调度的负担

9.6.1 进程池

  • submit() 异步提交

  • shutdown()关闭池之后就不能继续提交任务,并且会阻塞,直到已经提交的任务完成

import os
import time
import random
from concurrent.futures import ProcessPoolExecutor
# submit + shutdown
# def func():
#     print('start',os.getpid())
#     time.sleep(random.randint(1,3))
#     print('end', os.getpid())
# if __name__ == '__main__':
#     p = ProcessPoolExecutor(5)  #创建池,5个进程
#     for i in range(10):
#         p.submit(func) #异步提交
#     p.shutdown()   # 关闭池之后就不能继续提交任务,并且会阻塞,直到已经提交的任务完成
#     print('main',os.getpid())
  • 任务的参数 + 返回值
import os
import time
import random
from concurrent.futures import ProcessPoolExecutor
# def func(i,name):
#     print('start',os.getpid())
#     time.sleep(random.randint(1,3))
#     print('end', os.getpid())
#     return '%s * %s'%(i,os.getpid())
# if __name__ == '__main__':
#     p = ProcessPoolExecutor(5)
#     ret_l = []
#     for i in range(10):
#         ret = p.submit(func,i,'alex')
#         ret_l.append(ret)
#     for ret in ret_l:
#         print('ret-->',ret.result())  # ret.result() 同步阻塞,等程序结束完了返回值放进来了在取值
#     print('main',os.getpid())

进程池特点

  • 开销大

  • 一个池中的任务个数限制了我们程序的并发个数

9.6.2 线程池

# from concurrent.futures import ThreadPoolExecutor
# def func(i):
#     print('start', os.getpid())
#     time.sleep(random.randint(1,3))
#     print('end', os.getpid())
#     return '%s * %s'%(i,os.getpid())
# tp = ThreadPoolExecutor(20) #cpu个数*4或5
# ret_l = []
# for i in range(10):
#     ret = tp.submit(func,i)
#     ret_l.append(ret)
# tp.shutdown()
# print('main')
# for ret in ret_l:
#     print('------>',ret.result())
  • map()适用的场景更少
# from concurrent.futures import ThreadPoolExecutor
# def func(i):
#     print('start', os.getpid())
#     time.sleep(random.randint(1,3))
#     print('end', os.getpid())
#     return '%s * %s'%(i,os.getpid())
# tp = ThreadPoolExecutor(20)
# ret = tp.map(func,range(20)) #循环可迭代对象,并且把每个值传到函数
# for i in ret:
#     print(i)

回调函数(谁先回来先调用谁)

  • 效率更高

   

import requests
from concurrent.futures import ThreadPoolExecutor
def get_page(url):
    res = requests.get(url)
    return {'url':url,'content':res.text}

def parserpage(ret):
    dic = ret.result()
    print(dic['url'])
tp = ThreadPoolExecutor(5)
url_lst = [
    'http://www.baidu.com',   # 3
    'http://www.cnblogs.com', # 1
    'http://www.douban.com',  # 1
    'http://www.tencent.com',
    'http://www.cnblogs.com/Eva-J/articles/8306047.html',
    'http://www.cnblogs.com/Eva-J/articles/7206498.html',
]
ret_l = []
for url in url_lst:
    ret = tp.submit(get_page,url)
    ret_l.append(ret)
    ret.add_done_call(parserpage)  #给当前任务回调函数

9.6.3 总结池

  • 线程池:ThreadPoolExcutor

  • 进程池:ProcessPoolExcutor

  • 创建池子

tp = ThreadPoolExcutor(池中线程/进程的个数)
# 异步提交任务
ret = tp.submit(函数,参数1,参数2....)
# 获取返回值
ret.result()
#在异步的执行完所有任务之后,主线程/主进程才开始执行的代码
tp.shutdown() 阻塞 直到所有的任务都执行完毕
  • map 方法
# ret = tp.map(func,iterable) 迭代获取iterable中的内容,作为func的参数,让子线程来执行对应的任务
# for i in ret: 每一个都是任务的返回值

是单独开线程、进程还是池?

  • 如果只是开启一个子线程做一件事情,就可以单独开线程

  • 有一定量的任务等待程序去做,要达到一定的并发数,开启线程池

  • 根据你程序的io操作也可以判定是用池还是不用池

    • socket 大量的阻塞io recv recvfrom sockerserver

    • 爬虫的时候--池

 

9.7 协程

9.7.1 初识协程

  • 使用协程的优势:

    • 一个线程中的阻塞都被其他的各种任务沾满了

    • 让操作系统觉得这个线程很忙,尽量的减少这个线程进入阻塞的状态

    • 多个任务在同一个线程中执行也达到了一个并发的效果,规避了每一个任务的io操作,减少了线程的个数,减轻了操作系统的负担

  • 协程:用户级别的,由我们自己写的python代码来控制切换的,并且是操作系统不可见的

  • 在cpython解释器下--协程和线程都不能利用多核,都是在一个cpu上轮流执行

    • 由于多线程本身就不能利用多核

    • 所有即便是开启了多个线程也只能轮流在一个cpu上执行

    • 协程如果把所有任务的io操作都规避掉,只剩下需要使用cpu的操作就意味着协程就可以做到提高cpu利用率的效果

  • 多线程和协程

    • 线程 切换需要操作系统,开销大,操作系统不可控,给操作系统的压力大

      • 操作系统对io操作的感知更加灵敏

    • 协程 切换需要python代码,开销小,用户操作可控,完全不会增加操作系统的压力

      • 用户级别能够对io操作的感知比较低

  • 协程:能够在一个线程下的多个任务之间来回切换,那么每一个任务都是一个协程

  • 两种切换方式

    • 原生python完成 yield asyncio

    • c语言完成的python模块 greenlet gevent

9.7.2 greenlet 模块

# import time
# from  greenlet import greenlet
#
# def eat():
#     print('wusir is eating')
#     time.sleep(0.5)
#     g2.switch()
#     print('wusir finished eat')
#
# def sleep():
#     print('小马哥 is sleeping')
#     time.sleep(0.5)
#     print('小马哥 finished sleep')
#     g1.switch()
#
# g1 = greenlet(eat)
# g2 = greenlet(sleep)
# g1.switch()#开关

输出:wusir is eating
    小马哥 is sleeping
    小马哥 finished sleep
     wusir finished eat

事件循环的概念:第三者一直在循环所有的任务,调度所有的任务

9.7.3gevent

# import time
# print('-->',time.sleep)
# import gevent
# from gevent import monkey
# monkey.patch_all() #会识别time模块了,会把time模块里面的time.sleep变成它自己的time.sleep
# def eat():
#     print('wusir is eating')
#     print('in eat: ',time.sleep)
#     time.sleep(1)
#     print('wusir finished eat')
#
# def sleep():
#     print('小马哥 is sleeping')
#     time.sleep(1)
#     print('小马哥 finished sleep')
#
# g1 = gevent.spawn(eat)  # 创造一个协程任务
# g2 = gevent.spawn(sleep)  # 创造一个协程任务
# g1.join()   # 阻塞 直到g1任务完成为止
# g2.join()   # 阻塞 直到g1任务完成为止
# # gevent.joinall([g1,g2,g3])
# g_l = []
# for i in range(10):
#     g = gevent.spawn(eat)
#     g_l.append(g)
# gevent.joinall(g_l)
time.sleep不能完成协程任务
输出:--> <built-in function sleep>
     wusir is eating
     in eat:  <function sleep at    0x0000023D56A7CDC8>
     小马哥 is sleeping
     wusir finished eat
     小马哥 finished sleep

获取返回值

# import time
# import gevent
# from gevent import monkey
# monkey.patch_all()
# def eat():
#     print('wusir is eating')
#     time.sleep(1)
#     print('wusir finished eat')
#     return 'wusir***'
#
# def sleep():
#     print('小马哥 is sleeping')
#     time.sleep(1)
#     print('小马哥 finished sleep')
#     return '小马哥666'
#
# g1 = gevent.spawn(eat)
# g2 = gevent.spawn(sleep)
# gevent.joinall([g1,g2])#阻塞完了才能取返回值
# print(g1.value)
# print(g2.value)

输出: wusir is eating
      小马哥 is sleeping
       wusir finished eat
       小马哥 finished sleep
       wusir***
       小马哥666

 

9.7.4 asyncio

import asyncio

# 起一个任务
# async def demo():   # 协程方法
#     print('start')
#     await asyncio.sleep(1)  # 阻塞
#     print('end')
#
# loop = asyncio.get_event_loop()  # 创建一个事件循环
# loop.run_until_complete(demo())  # 把demo任务丢到事件循环中去执行

# 启动多个任务,并且没有返回值
# async def demo():   # 协程方法
#     print('start')
#     await asyncio.sleep(1)  # 阻塞
#     print('end')
#
# loop = asyncio.get_event_loop()  # 创建一个事件循环
# wait_obj = asyncio.wait([demo(),demo(),demo()])
# loop.run_until_complete(wait_obj)

# 启动多个任务并且有返回值
# async def demo():   # 协程方法
#     print('start')
#     await asyncio.sleep(1)  # 阻塞
#     print('end')
#     return 123
#
# loop = asyncio.get_event_loop()
# t1 = loop.create_task(demo())
# t2 = loop.create_task(demo())
# tasks = [t1,t2]
# wait_obj = asyncio.wait([t1,t2])
# loop.run_until_complete(wait_obj)
# for t in tasks:
#     print(t.result())

# 谁先回来先取谁的结果
# 谁先回来先取谁的结果
# import asyncio
# async def demo(i):   # 协程方法
#     print('start')
#     await asyncio.sleep(10-i)  # 阻塞
#     print('end')
#     return i,123
#
# async def main():
#     task_l = []
#     for i in range(10):
#         task = asyncio.ensure_future(demo(i))
#         task_l.append(task)
#     for ret in asyncio.as_completed(task_l):
#         res = await ret
#         print(res)
#
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())

语法

# await 阻塞 协程函数这里要切换出去,还能保证一会儿再切回来
    # await 必须写在async函数里,async函数是协程函数
    # loop 事件循环
    # 所有的协程的执行 调度 都离不开这个loop

python原生的底层的协程模块

  • 爬虫 webserver框架

  • 提高网络变成的效率和并发效果

推荐阅读