首页 > 技术文章 > 并发编程笔记(1)——多进程、同步/异步/阻塞/非阻塞、守护进程、进程锁

lynlearnde 2020-08-10 19:28 原文

内容目录

  • 操作系统的历史
  • 进程

内容详细:

操作系统:

  • 一:隐藏了丑陋的硬件调用接口,为应用程序员提供调用硬件资源的更好,更简单,更清晰的模型(系统调用接口)。应用程序员有了这些接口后,就不用再考虑操作硬件的细节,专心开发自己的应用程序即可。
    例如:操作系统提供了文件这个抽象概念,对文件的操作就是对磁盘的操作,有了文件我们无需再去考虑关于磁盘的读写控制(比如控制磁盘转动,移动磁头读写数据等细节),

  • 二:将应用程序对硬件资源的竞态请求变得有序化
    例如:很多应用软件其实是共享一套计算机硬件,比方说有可能有三个应用程序同时需要申请打印机来输出内容,那么a程序竞争到了打印机资源就打印,然后可能是b竞争到打印机资源,也可能是c,这就导致了无序,打印机可能打印一段a的内容然后又去打印c...,操作系统的一个功能就是将这种无序变得有序。

进程:

  • 隐藏丑陋复杂的硬件接口,提供良好的抽象接口
  • 管理、调度进程,并且将多个进程对硬件的竞争变得有序

同步/异步/阻塞/非阻塞

多进程模块:multiprocessing

from multiprocessing import Process #多进程必用模块
def func():
    print(12345)

if __name__ == '__main__':
    p = Process(target=func)    # 把一个函数注册进这个进程里面
    # p是一个进程对象,还没有启动进程
    p.start()   #进程启动
    print('*'*10)
#多进程:带参数的函数
from multiprocessing import Process
def func(args):
    print(args)

if __name__ == '__main__':
    p = Process(target=func,args=('参数',))  #传参时必须是元组
    p.start()   #进程启动

os.pid:获取当前进程号

import os
from multiprocessing import Process #多进程必用模块
def func():
    print(12345)
    print('子进程:',os.getpid())
    print('子进程的父进程:',os.getppid())

if __name__ == '__main__':
    p = Process(target=func)    # 把一个函数注册进这个进程里面
    # p是一个进程对象,还没有启动进程
    p.start()   #进程启动
    print('*'*10)
    print('主进程:',os.getpid())
    print('主进程的父进程:',os.getppid())
  • 当前程序为主进程,如果新建了进程,并把函数注册进了该进程,则该进程与主进程同时运行。
  • 主进程的父进程为pycharm

进程的生命周期

  • 主进程
  • 子进程
  • 开启了子进程的主进程:
    • 主进程的代码如果长,等待主进程自己的代码执行结束
    • 子进程的执行时间长,主进程会在主进程代码执行完毕后等待子进程执行完毕之后,主进程才结束

进程中join的用法

  • 感知一个子程序的结束,将异步程序改为同步程序

    import time
    from multiprocessing import Process
    def func(arg1,arg2):
        print('*' * arg1)
        time.sleep(5)
        print('*' * arg2)
    
    
    if __name__ == '__main__':
        p = Process(target=func,args=(5,10))#传参时必须是元组
        p.start()   #进程启动
        p.join()    #是感知一个子程序的结束,将异步程序改为同步
        print('=========:执行结束')
    
  • 多个子进程执行完后,主程序再立即执行,需要用到join方法

    from multiprocessing import Process
    
    def func(arg1,arg2):
        print('*'*arg1)
        print('='*arg2)
    
    if __name__ == '__main__':
        p_list = []
        for i in range(10):
            p = Process(target=func,args=(10*i,20*i))
            p_list.append(p)
            p.start()
        [p.join() for p in p_list]  #p.join()相当于把子程序都执行完了,再运行主程序
        #子程序由异步变成同步
        print('主程序执行完毕')
    
  • 应用场景:写文件:同时开启500个进程写文件,异步执行,写文件执行完毕后打印目录

    • 注意,写文件执行完毕后需要用到join方法变为同步,然后再执行打印目录命令
    import os
    from multiprocessing import Process
    
    def func(filename,content):
        with open(filename,'w')as f:
            f.write(content*10*'*')
    
    if __name__ == '__main__':
        p_list = []
        for i in range(1,5):
            p = Process(target=func,args=('info%s'%i,i))
            p_list.append(p)
            p.start()
        [p.join() for p in p_list]  #之前所有进程必须都执行完才能执行下面的代码
        print([i for i in os.walk(r'D:\pycharm_project\python36')])
    

多个子程序运行

  • 启动多个子程序调用函数,同时执行

    import time
    from multiprocessing import Process
    
    def func(arg1,arg2):
        print('*'*arg1)
        time.sleep(5)
        print('*'*arg2)
    
    if __name__ == '__main__':          #启用多个子程序同时运行
        p1 = Process(target=func,args=(3,10))            #传参
        p1.start()                  #启动子进程
        p2 = Process(target=func,args=(4,11))
        p2.start()
        p3 = Process(target=func,args=(5,12))
        p3.start()
    
  • 也可以使用for循环的方法

    import time
    from multiprocessing import Process
    
    def func(arg1,arg2):
        print('*'*arg1)
        time.sleep(5)
        print('*'*arg2)
    
    if __name__ == '__main__':
        for i in range(4):			#使用for循环该进程,相当于开启了4个子进程
            p1 = Process(target=func,args=(3,10))
            p1.start()
    

继承类的多线程方法

  • 自定义类,必须继承Process类

  • 必须实现一个run方法,run方法中是在子进程中执行的代码

  • start方法自动处理run方法

    import os
    from multiprocessing import Process
    
    class MyProcess(Process):
        def run(self):
            print(os.getpid())        #要执行的子进程方法
    
    if __name__ == '__main__':
        print('主:',os.getpid())
        p1 = MyProcess()        #实例化类
        p1.start()              #start方法自动处理run方法
        p2 = MyProcess()        #实例化类
        p2.start()              #启动第二个子进程
    
    # 自定义类,必须继承Process类
    # 必须实现一个run方法,run方法中是在子进程中执行的代码
    
  • 传参的方式

    import os
    from multiprocessing import Process
    
    class MyProcess(Process):
        def __init__(self,arg1,arg2):
            super().__init__()      
            self.arg1 = arg1
            self.arg2 = arg2
    
    
        def run(self):
            print(self.pid)
            print(self.name)
            print(self.arg1)
            print(self.arg2)
    
    if __name__ == '__main__':
        print('主:',os.getpid())
        p1 = MyProcess(1,2)        #实例化类
        p1.start()              #启动该子进程
        p2 = MyProcess(3,4)        #实例化类
        p2.start()              #启动第二个子进程
    

多进程之间的数据隔离问题

  • 子进程中的数据无法修改主进程中的数据

    import os
    from multiprocessing import Process
    
    def func():
        global n        #声明了一个全局变量
        n = 0           #重新定义一个全局变量的n
        print('子进程pid:%s'%os.getpid(),n)
    
    if __name__ == '__main__':  #windows系统下必须使用main
        n = 100
        p = Process(target=func)
        p.start()
        p.join()
        print('主进程pid:%s'%os.getpid(),n)	
    
    #结果为:
    子进程pid:432 0			#在子进程中声明了修改一个全局变量,但是主进程的变量并没有发生改变
    主进程pid:6696 100
    

socket多进程示例:

  • 开启多个client端,实现多并发

  • 待优化:多进程会增加服务器内存损耗

    # server:
    import socket
    from multiprocessing import Process
    
    def serve(conn):
        ret = '你好'.encode('utf-8')
        conn.send(ret)
        msg = conn.recv(1024).decode('utf-8')
        print(msg)
        conn.close()
    
    if __name__ == '__main__':
        sk = socket.socket()
        sk.bind(('127.0.0.1',8080))
        sk.listen()
        while True:
            conn,addr = sk.accept()
            p = Process(target=serve,args=(conn,))
            p.start()
        sk.close()
        
    # client:
    import socket
    
    sk = socket.socket()
    sk.connect(('127.0.0.1',8080))
    msg = sk.recv(1024).decode('utf-8')
    print(msg)
    msg2 = input('>>>').encode('utf-8')
    sk.send(msg2)
    sk.close()
    

守护进程

  • 守护进程会随着主进程的代码执行完毕而结束,而不是主进程结束

  • 一定是在子进程start之前设置

    import time
    from multiprocessing import Process
    
    def func():
        while True:
            time.sleep(0.2)
            print('我还活着')
    
    def func2():
        print('in func2 start')
        time.sleep(8)
        print('in func2 finished')
    
    if __name__ == '__main__':
        p = Process(target=func)
        p.daemon = True         #设置该子进程为守护进程
        p.start()
        p2 = Process(target=func2)
        p2.start()
        i = 0
        while i < 5:
            print('我是socket server')
            time.sleep(1)
            i+=1
    # func随着主进程的代码执行结束而结束,而func2子进程继续执行,执行完毕后,主进程整体结束并关闭
    

强制结束子进程

  • 在主进程内结束一个子进程p.terminate()

    • 结束一个进程不会立即生效,需要操作系统响应的过程
    import time
    from multiprocessing import Process
    
    def func():
        while True:
            time.sleep(0.2)
            print('我还活着')
    
    def func2():
        print('in func2 start')
        time.sleep(5)
        print('in func2 finished')
    
    if __name__ == '__main__':
        p = Process(target=func)
        p.daemon = True         #设置该子进程为守护进程
        p.start()
        p2 = Process(target=func2)
        p2.start()
        p2.terminate()          #结束一个子进程
        print(p2.is_alive())    #立即检验子进程存活状态,此时子进程活着,因为执行后需要操作系统响应的过程
        time.sleep(1)
        print(p2.is_alive())    #检验一个进程是否还活着(True/Flase)
        print(p.name)           #输出子进程的名字
    

判断子进程是否存活:

  • p.is_alive() -------进程中的方法(True / Flase)
  • p.name -------输出子进程的名字

进程锁(重点)

  • 必须保证数据安全,所以异步变为同步执行修改数据的代码

    • 关键代码加锁
    • 进程1来了后先拿钥匙进入代码,后续进程为阻塞状态
    • 进程1执行关键代码,执行完毕后把钥匙归还
    • 此时进程2可以拿到钥匙了,进入关键代码,后续进程为阻塞状态
    • 依次循环,直到最后一个进程结束。

  • 进程锁应用:显示余票和买票

    #ticket文件代码:
    {"ticket": 1}
    
    #主程序代码:
    import json
    import time
    from multiprocessing import Process
    from multiprocessing import Lock        #启用进程锁
    
    def show(i):
        with open('ticket')as f:
            dic = json.load(f)
            time.sleep(0.1)         #模拟拿数据时的网络延迟
        print('余票:%s'%dic['ticket'])
    
    def buy_ticket(i,lock):
        lock.acquire()  #拿钥匙进门,只能拿钥匙的进程进去,其他进程为阻塞状态,直到还钥匙为止
        with open('ticket')as f:
            dic = json.load(f)
            time.sleep(0.2)
        if dic['ticket'] > 0:
            dic['ticket'] -= 1
            print('\033[32m%s买到票了\033[0m'%i)
        else:
            print('\033[31m%s没票了\033[0m'%i)
        time.sleep(0.1)
        with open('ticket','w') as f:
            json.dump(dic,f)
        lock.release()  #还进程锁的钥匙,此时下一个阻塞进程进场
    
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=show,args=(i,))
            p.start()
        lock = Lock()           #拿到锁对象
        for i in range(10):
            p = Process(target=buy_ticket,args=(i,lock))
            p.start()
    

推荐阅读