首页 > 技术文章 > 🍖Python并发编程之多进程

songhaixing 2021-01-23 23:32 原文

引入

在进入多进程的学习之前, 一定需要先了解一个应用程序是如何开启一个进程的, 以及操作系统对进程是如何进行分配资源的, 进程、线程、进程池、进程三态、同步、异步、并发、并行、串行的概念也要非常的明确, 下面将介绍 Python 并发编程之多进程

一.multiprocessing 模块介紹

1.什么是 multiprocessing 模块

  • multiprocess 模块是 Python 中的多进程管理模块

2.multiprocessing 模块简介

  • python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程, Python提供了multiprocessing

3.multiprocessing 模块的作用

  • multiprocessing 模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似
  • multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件

ps : 值得注意的是 : 与线程不同,进程没有任何共享状态,多个进程的内存空间相互物理隔离, 进程修改的数据,改动仅限于该进程内

二.Process类介绍

multiprocessing 模块提供了 Process 类,该类可用来在 Windows 平台上创建新进程

使用 Process 类创建实例化对象,其本质是调用该类的构造方法创建新进程

Process([group [, target [, name [, args [, kwargs]]]]])  # 实际上是调用了下面的构造方法
def __init__(self,group=None,target=None,name=None,args=(),kwargs={})

值得注意的是 :

​ 参数的指定需要使用关键字的方式

args 指定的值是为 target 指定的函数的位置参数, 并且是一个元组形式, 一个值必须带逗号

  • 参数含义 :

参数名 说明
group 该参数未进行实现,不需要传参
target 为新建进程指定执行任务,也就是指定一个函数
name 为新建进程设置名称
args 为 target 参数指定的参数传递非关键字参数
kwargs 为 target 参数指定的参数传递关键字参数
  • 常用方法

方法 作用
run( ) 第 2 种创建进程的方式需要用到,继承类中需要对方法进行重写,该方法中包含的是新进程要执行的代码
start( ) 和启动子线程一样,新创建的进程也需要手动启动,该方法的功能就是启动新创建的线程
join([timeout]) 主线程等待子进程终止(强调:是主线程处于等的状态,而p是处于运行的状态),timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
is_alive( ) 判断当前进程是否还活着
terminate( ) 中断该进程
  • 常用属性

属性 作用
name 可以为该进程重命名,也可以获得该进程的名称。
daemon 和守护线程类似,通过设置该属性为 True,可将新建进程设置为“守护进程”
pid 返回进程的 ID 号。大多数操作系统都会为每个进程配备唯一的 ID 号
exitcode 进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
authkey 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网

三.Process类创建子进程的两种方式

0.Process 类使用的注意点

WindowsProcess( ) 必须放在 if __name__ == '__main__': 之下

  • 这是 Windows上多进程的实现问题, 在 Windows 上,子进程会自动 import 启动它的这个文件,而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归创建子进程报错, 所以必须把创建子进程的部分用那个 if 判断保护起来,import 的时候如果不是当前执行文件就不会执行 Process, 也就不会无限递归了 (Linux上没有这个问题)

ps : fork 是 OS提供的方法 os.fork(), 该方法可以在当前程序中再创建出一个进程, 但是在 Windows 平台上无效, 只在 Linux, UNIX, Mac OSX上有效

1.开启子进程方式一

  • 直接创建 Process 类的实例对象,由此就可以创建一个新的进程
from multiprocessing import Process
import time,os

def test(n):
    print(f"父进程{os.getppid()},紫禁城{os.getpid()}")
    time.sleep(n)
    print(f"父进程{os.getppid()},紫禁城{os.getpid()}")

if __name__ == '__main__':
    p = Process(target=test,args=(2,))
    p.start()  # 做发起系统调用的活
    print(f"当前执行文件{os.getpid()}")

'''
当前执行文件16860
父进程16860,紫禁城6404
父进程16860,紫禁城6404
'''

2.开启子进程方式二

  • 通过继承 Process 类的子类,创建实例对象,也可以创建新的进程
  • 继承 Process 类的子类需重写父类的 run( ) 方法
from multiprocessing import Process
import time,os

class MyProcess(Process):
    def __init__(self,n):
        super().__init__()
        self.n = n

    def run(self) -> None:
        print(f"父进程{os.getppid()},紫禁城{self.pid}")
        time.sleep(self.n)
        print(f"父进程{os.getppid()},紫禁城{os.getpid()}")

if __name__ == '__main__':
    p = MyProcess(2)
    p.start()
    print(f"当前执行文件{os.getpid()}")
    
'''
当前执行文件8136
父进程8136,紫禁城1280
父进程8136,紫禁城1280
'''

四.验证进程的内存空间是相互隔离的

from multiprocessing import Process
import time

x = 222

def test():
    global x
    x = 111

if __name__ == '__main__':
    p = Process(target=test)
    p.start()     # 发送系统调用
    time.sleep(1) # 等待子进程运行完
    print(x)      # 222 (还是原来的)

子进程 test 函数中声明全局变量 x, 并修改 x 的值, 等待子进程运行完毕, 最后打印 x , 发现 x 的值并没有改变

五.Process 对象的 join 方法

  • 让父进程等待子进程的终止, 父进程在等, 子进程在运行
from multiprocessing import Process

x = 222

def test():
    global x
    x = 111

if __name__ == '__main__':
    p = Process(target=test)
    p.start()   # 发送系统调用
    p.join()    # 等待子进程运行完(之前我们使用sleep并不能精确的知道子进程结束运行的时间)
    print(x)    # 222 (还是原来的)
  • 参数 timeout 是可选的超时间, 等多久就不等了
from multiprocessing import Process

x = 222

def test():
    global x
    x = 111

if __name__ == '__main__':
    p = Process(target=test)
    p.start()        # 发送系统调用
    p.join(0.001)    # 等待 0.001 秒就不等了
  • 注意点 : start() 只是发起系统调用, 并不是运行子进程, 当 start() 执行完后紧接着就执行后面的代码
  • start() 发起调用之后, 是通知操作系统创建一个子进程, 操作系统需要申请一个内存空间, 将父进程的数据复制一份到子进程的内存空间中作为初始化用 (Linux是将父进程的数据原原本本的复制一份, 而Windows 稍有些不同), 然后子进程才运行起来
import time,os

def test(n):
    time.sleep(n)
    print(f"父进程{os.getppid()} 子进程{os.getpid()}")

if __name__ == '__main__':
    p1 = Process(target=test,args=(3,))
    p2 = Process(target=test,args=(2,))
    p3 = Process(target=test,args=(1,))

    p1.start()  # 用时 3 秒
    p2.start()  # 用时 2 秒
    p3.start()  # 用时 1 秒

    start_time = time.time()
    p1.join()
    p2.join()
    p3.join()   # 三个进程都在并发的运行, 主进程一共运行3秒多
    stop_time = time.time()
    print(f'主进程{os.getpid()} 用时{stop_time-start_time}')
    
'''
父进程10888 子进程6792
父进程10888 子进程13368
父进程10888 子进程14800
主进程10888 用时3.131737470626831
'''

六. Process 对象其他常用方法介绍

1.terminate( ) : 关闭进程

2.is_alive( ) : 查看进程是否存活

from multiprocessing import Process
import time

def test():
    time.sleep(1)

if __name__ == '__main__':
    p = Process(target=test,args=(2,))
    p.start()
    p.terminate()  # 只是发起系统调用, 通知操作系统关闭子进程
    print(p.is_alive())  # True

由上面可知 terminate() 只是发起系统调用, 并不是立即关闭子进程, 操作系统关闭子进程回收资源也要一小会, 我们可以使用sleep简单延时

from multiprocessing import Process
import time

def test():
    time.sleep(1)

if __name__ == '__main__':
    p = Process(target=test,args=(2,))
    p.start()
    p.terminate()   # 只是发起系统调用, 通知操作系统关闭子进程
    time.sleep(0.1) # 稍微延时一点
    print(p.is_alive())  # False

3.name : 为新建进程设置名字

4.pid : 进程号

from multiprocessing import Process
import time,os

class MyProcess(Process):
    def __init__(self,n,name):
        super().__init__()
        self.n = n
        self.name = name

    def run(self) -> None:
        time.sleep(self.n)
        print(f"子进程pid:{self.pid}")    # 子进程pid:14156
        print(f"子进程模块名:{__name__}")  # 子进程模块名:__mp_main__
        print(f"子进程名:{self.name}")    # 子进程名:aaaa

if __name__ == '__main__':
    p = MyProcess(1,"aaaa")
    p.start()
    p.join()
    print(f"打印子进程pid:{p.pid}")       # 打印子进程pid:14156
    print(f"打印主进程pid:{os.getpid()}") # 打印主进程pid:16340
    print(f"子进程名:{p.name}")           # 子进程名:aaaa
    print(f"主进程模块名:{__name__}")      # 主进程模块名:__main__

__name__ : Python中每个模块都有自己的名字, __name__是一个系统变量, 是模块的标识符, 值是模块的名称, 并且在自身模块中:__name__的值等于__mian__

七.孤儿进程

1.什么是孤儿进程

  • 当一个父进程创建了多个子进程, 子进程再创建子子进程等等
  • 父进程因正常运行完毕或其他情况被干掉的时候, 它的子进程就变成了孤儿进程
  • 为了避免孤儿进程完成任务后没有父亲通知操作系统回收资源
  • 于是 PID 为 "1"的顶级进程 systemd 就接手了这个孤儿进程
  • systemd 相当于一个孤儿院, 但凡是孤儿进程都会成为它的子进程

2.孤儿进程演示

  • 先在一个虚拟终端里开启一个 Bash 进程,把他当做父进程
  • 紧接着开启一个 "sleep 1000 &" 进程, 把它当做子进程
  • 然后在另一个虚拟终端查看这两个进程信息

孤儿进程bash和sleep演示

  • 再杀掉 sleep 的父进程 Bash 看看结果如何

杀掉父进程bash2

  • 图示

八.僵尸进程

1.什么是僵尸进程

  • 这是Linux出于好心的设计
  • 一个父进程开启了一堆子进程, 当子进程比父进程先运行完(死掉)
  • 操作系统会释放子进程占用的重型资源(内存空间, CPU资源, 打开的文件)
  • 但会保留子进程的关键信息(PID, 退出状态, 运行时间等)
  • 目的是为了让父进程能随时查看自己的子进程信息(不管该子进程有没有死掉)
  • 这种已经死掉的子进程都会进入僵尸状态, ''僵尸进程''是Linux系统的一种数据结构

ps : 任何正常结束的子进程都会进入到僵尸状态, 而被强制终止的进程的所有信息将会被清除

2.僵尸进程回收----概念

  • 操作系统保留子进程信息供父进程查看
  • 当父进程觉得不再需要查看的时候, 会向操作系统发送一个 wait / waitpid 系统调用
  • 于是操作系统再次清理僵尸进程的残余信息

3.僵尸进程回收----实际

  • 优秀的开源软件
这些软件在开启子进程时, 父进程内部会及时调用"wait" / "waitpid" 通知操作系统来回收僵尸进程
  • 水平良好的开发者
功底深厚,知道父进程要对子进程负责
会在父进程内部考虑到调用 "wait" / "waitpid" 通知操作系统回收僵尸进程
但是发起系统调用时间可能慢了一点
于是我们就可以使用 "ps aux | grep [z]+" 命令查看到僵尸进程
  • 水平非常低的开发者
技术半吊子,只知道开子进程,父进程也不结束,并在那一直开子进程,不知道什么是僵尸进程
系统调用 "wait" / "waitpid" 也没有听说过
于是计算机会堆积许多的僵尸进程,占用着大量的"pid",(每启动一个进程就会分配一个"pid号")
计算机进入一个奇怪的现象: 内存够用,硬盘充足,CPU空闲,但新的程序无法启动
这就是因为"PID"不够用了

4.如何清理僵尸进程

  • 针对良好的开发者
我们可以手动发信号给父进程: "# kill -CHLD [父进程的PID]"
通知父进程快点向操作系统发起系统调用 "wait" / "waitpid" 来清理变成僵尸的儿子们
  • 针对半吊子水平的开发者
这种情况子下,我们只能将父进程终结,因为你发给它的信号不会得到回应
父进程被杀死,"僵尸进程"将会变成"僵尸孤儿进程"
但凡是"孤儿进程"都会被Linux系统中"PID"为"1"的顶级进程"systemd"回收
"systemd"会发起系统调用 "wait" / "waitpid" 来通知操作系统清理僵尸进程
# Centos7 的顶级进程为 systemd
# Centos6 的顶级进程为 init

5.使用Process类制造僵尸进程

原本 multiprocessing 模块在你发起系统调用 start() 开启子进程的时候会自动检测当前状态下是否存在僵尸进程, 并将其回收, join() 调用也是一样, 我们可以查看这两个调用的源码进行查看 :

image-20210120172357714

image-20210120172459572

  • 我们可以让父进程创建子进程后暂停在原地什么事情都不做, 于是 multiprocessing 模块的底层机制都没有运行, 也就没法清除运行完毕并变成僵尸态的子进程, 下面再 Linux 上进行演示 :
# coding:utf-8
from multiprocessing import Process
import os,time

def task():
    print("子进程:%s"%os.getpid())
    time.sleep(4)  # 子进程 4 秒后结束变成僵尸进程

if __name__ == "__main__":
    for i in range(400):
        print("父进程:%s"%os.getpid())
        p = Process(target=task)
        p.start()
    time.sleep(100000)  # 让父进程停在原地什么也不做

使用 top 命令查看系统状态信息, 可以发现已经出现了 400 个僵尸进程

image-20210120183029242

我们可以通过 kill 刚运行的 py 文件将这些僵尸进程变成孤儿进程, 从而被 systemd 接管, systemd 再发起系统调用将其清除

九.守护进程

1.什么是守护进程

由主进程创建, 并会随着主进程的结束而结束

2.守护进程的生命周期

  • 进程之间是相互独立的, 守护进程会在主进程代码执行结束后就终止

  • 守护进程内无法再次开启子进程, 否则会抛出异常 : AssertionError: daemonic processes are not allowed to have children

from multiprocessing import Process
import os,time

class MyProcess(Process):
    def __init__(self,n):
        super().__init__()
        self.n = n

    def run(self) -> None:
        print(f'子进程:{os.getpid()}开始')
        time.sleep(2)
        print(f"子进程:{os.getpid()}结束")

if __name__ == '__main__':
    p = MyProcess(2)
    p.daemon = True  # 需要在 strat() 之前设置
    p.start()
    print(f"主进程:{os.getpid()}结束")  
    # 在当前主进程的代码已经运行完毕, 守护进程就会终止, 甚至守护进程还没来的急启动
'''输出
主进程:16924结束
'''

我们使用 sleep 让主进程简单延时一下好让子进程启动起来

from multiprocessing import Process
import os,time

class MyProcess(Process):
    def __init__(self,n):
        super().__init__()
        self.n = n

    def run(self) -> None:
        print(f'子进程:{os.getpid()}开始')
        time.sleep(2)
        print(f"子进程:{os.getpid()}结束")

if __name__ == '__main__':
    p = MyProcess(2)
    p.daemon = True
    p.start()
    time.sleep(1)  # 延时一秒, 足够操作系统将子进程开起来
    print(f"主进程:{os.getpid()}结束")

'''输出
子进程:8620开始
主进程:10480结束
'''

再次强调, 守护进程是在主进程的代码执行完毕终止

from multiprocessing import Process
import os,time

def Foo():
    print(f"Foo:{os.getpid()}-->111")
    time.sleep(1)
    print(f"Foo--->222")

def Bar():
    print(f"Bar:{os.getpid()}-->333")
    time.sleep(2)
    print(f"Bar--->444")

if __name__ == '__main__':
    p1 = Process(target=Foo)
    p2 = Process(target=Bar)

    p1.daemon = True  # 将 p1 设置守护进程
    p1.start()
    p2.start()
    print("------>end")
# 当运行到这一行的时候主进程代码已经运行完了, 那么守护进程也已经终止了, 与主进程在等着 p2 运行无关, 这时操作系统还没来的急启动 p1 这个子进程

'''输出
------>end
Bar:18124-->333
Bar--->444
'''

十.进程同步锁(互斥锁/排它锁)

上面我们实现了进程的并发, 进程之间的数据是不共享的, 但是他们可以共享同一个文件(硬盘空间), 或者是同一个打印空间, 然而在共享的同时也带来了问题 : 进程的运行不是同时进行的, 它们没有先后顺序, 一旦开启也不受我们的限制, 当多个进程使用同一份数据资源时, 就会引发数据安全或者数据混乱问题

1.什么是互斥锁

我们打个简单的比方, 公司里的一台打印机, 每个人都可以使用, 但同事只能有一个人在使用, 不然就会造成打印错乱; 又比如合租房的卫生间, 合住的同伴都可以使用卫生间, 但每次只能一个人进去, 进去之后门就锁上了(相当于加锁 Lock( ).acquire( )), 出来之后开门, 其他人又可以使用卫生间了(相当于解锁Lock( ).release( ))

  • 模拟多个用户共同使用同一份文件(抢票)

推荐阅读