首页 > 技术文章 > 11.并发编程之进程

journeyer-xsh 2020-07-20 13:19 原文

一、操作系统基础

定义:操作系统是一个协调、管理和控制计算机硬件资源和软件资源的控制程序。

操作系统本质位于计算机硬件和软件之间,本质也是一个软件。操作系统由操作系统的内核(运行于内核态,管理硬件资源)以及系统调用(运行于用户态,为应用程序员写的应用程序提供系统调用接口)两部分组成。

操作系统的发展史:

  • 第一代计算机(1940~1955):真空管和穿孔卡片
  • 第二代计算机(1955~1965):晶体管和批处理系统
  • 第三代计算机(1965~1980):集成电路芯片和多道程序设计
  • 第四代计算机(1980~至今):个人计算机

多道技术:

  • 实现并发,现在是多核,但是每个核都会用到多道技术

  • 空间上的复用:内存中有多道程序

  • 时间上的复用:复用一个cpu的时间片

二、多进程基础

2.1 进程基础

进程:就是一个过程或一个任务,与程序的区别是程序只是一些代码,但是进程是程序的运行过程,程序运行起来就是一个进程,一个进程运行两次是两个进程。进程也是计算中最小的资源分配单位。

pid:进程的唯一标识,

2.2 并发和并行

并发:伪并行,即多个程序轮流在一个cpu上运行,像多个程序在同时运行。单个cpu+多道技术可实现并发。

并行:多个程序同时运行,在多个cpu上运行。

实现:单核下,可以利用多道技术,多核,每个核也都可以利用多道技术(多道技术是针对单核而言的)有4个核,5个任务,这样同一时间有4个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4;一旦其中一个遇到I/O被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术。

image-20200706173416284

多道技术:内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另外一个,使每个进程各自运行一段时间,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1秒内,cpu却可以运行多个进程,这就给人产生了并行的错觉,即伪并行,以此来区分多处理器操作系统的真正硬件并行(多个cpu共享同一个物理内存)。

2.3 同步\异步和阻塞\非阻塞

同步:就是在发出一个功能调用时,在没有得到结果之前,该调用就不会返回。按照这个定义,其实绝大多数函数都是同步调用。简单来说就是做A事的时候发起B事,必须等待B事结束。

异步的概念和同步相对。当一个异步功能调用发出后,调用者不能立刻得到结果。当该异步功能完成后,通过状态、通知或回调来通知调用者。简单来说就是做A事的时候发起B事件,不需要等待B事件结束就可以继续A事件。

阻塞:cpu不工作,等待状态,input,accept,recv,recvfrom,sleep,connect

非阻塞:cpu在工作

三、多进程

进程的三状态图

image-20200707101309890

进程的调度算法:给所有的进程分配资源或分配cpu的使用权的一种方法。

短作业优先,先来先服务,多级反馈算法,

线程是进程中的一个单位,不能脱离进程存在,线程是计算中能被cpu调度的最小单位。

四、multiprocessing模块

4.1 创建子进程

pid --> process id

ppid --> parent peocess id

from multiprocessing import Process
import os
def func():
  	# 获取进程id和父进程id
    print(os.getpid(),os.getppid())


if __name__ == '__main__':
  	# 只会在主进程中执行一次,这需要将只在主进程中执行的代码放在main下面。
    print('main', os.getpid(),os.getppid())
    p = Process(target=func) 
    p.start()       # 此处开启了一个子进程   

在子进程中会自动import 父进程中所有的文件,如果不加if __name__ == '__main__': ,那么创建子进程时会自动执行了p =process(tartget=func)这会创建子进程的子进程,导致循环。

在Linux中会把父进程中的内容复制一份到子进程中,而不是import 父进程的代码,通过fork来完成的。

给子进程传递参数:在 p = Process(target=func) 传递参数,p = Process(target=func,args=('a')) 传递的args参数必须是一个元组。

能不能获取子进程的返回值-->不能

可以开启多个子进程

# 多元的处理进程的模块
from multiprocessing import Process
import os
import time

def func(name):
    time.sleep(5)
    print(os.getpid(),os.getppid(), name)

if __name__ == '__main__':
    print('main', os.getpid(),os.getppid())
    p = Process(target=func,args=('an',)) 
    p.start()       # 此处开启了一个子进程   
    p = Process(target=func,args=('bn',)) 
    p.start()
    p = Process(target=func,args=('cn',)) 
    p.start()

父进程和子进程是同步的,必须先等父进程创建完毕之后再创建子进程,start()是异步非阻塞,三个子进程几乎是同时创建完成的。

4.2 同步阻塞

# 相关概念
# 同步阻塞
    # 调用函数必须等待结果\cpu没工作 input sleep recv accept connect get
# 同步非阻塞
    # 调用函数必须等待结果\cpu工作 - 调用了一个高计算的函数 strip eval('1+2+3') sum max min sorted
# 异步阻塞
    # 调用函数不需要立即获取结果,而是继续做其他的事情,在获取结果的时候不知道先获取谁的,但是总之需要等(阻塞)
# 异步非阻塞
    #  调用函数不需要立即获取结果,也不需要等 start() terminate()

p.join的作用

from multiprocessing import Process
import os
import time

def func(name):
    time.sleep(2)
    print('发送给%s的邮件'%(name))

if __name__ == '__main__':
    arg_lst = [('an',), ('bn',), ('cn',)]
    for arg in arg_lst:
        p = Process(target=func,args=arg) 
        p.start()
    		# p.join()
    print('邮件发送完毕')
# 打印结果
# 邮件发送完毕
# 发送给an的邮件
# 发送给bn的邮件
# 发送给cn的邮件

导致这种结果的原因:每个子进程的创建之间是异步的,创建子进程和print()之间也是异步的,但是发送邮件的func函数是有延时的,导致先打印“邮件发送完毕”再打印3句发送给...的邮件,因为这3个子进程的创建之间是异步的,所以这3句话会同时打印。

p.join()的作用 :等p这个进程执行完毕再去执行后续代码,可以理解为将上述的子进程创建和print变成同步关系。如果将上述的p.join()注释掉,那么就变成了创建一个进程join一次,即顺序执行an,bn,cn这三个子进程,这样就将子进程的创建变成了同步关系。

可以将上述代码改为:

from multiprocessing import Process
import os
import time
import random

def func(name):
    time.sleep(random.random())
    print('发送给%s的邮件'%(name))

if __name__ == '__main__':
    arg_lst = [('an',), ('bn',), ('cn',)]
    p_lst = []
    for arg in arg_lst:
        p = Process(target=func,args=arg) 
        p.start()
        p_lst.append(p)
    for p in p_lst:
        p.join()
    print('邮件发送完毕')
# 打印结果
# 发送给bn的邮件
# 发送给an的邮件
# 发送给cn的邮件
# 邮件发送完毕

这样就是异步的了,如果在 p_lst.append(p)后面直接p.join(),那么他就是异步的了。

4.3 异步阻塞

多进程之间的数据是否隔离?是,即所占内存是隔开的。

n = 0
def func():
    global n 
    n += 1

if __name__ == '__main__':
    p_1 = []
    for i in range(100):
        p = Process(target=func)
        p.start()
        p_1.append(p) 
    for p in p_1:
        p.join() 
    print(n)
# 打印结果 0

4.4 多进程实例:使用多进程实现一个并发的socketserver

# 客户端
import time
import socket

sk = socket.socket()
sk.connect(('127.0.0.1',9001))

while True:
    sk.send(b'hello')
    msg =sk.recv(1024).decode('utf-8')
    print(msg)
    time.sleep(0.5)

sk.close()
# 服务端
import socket
from multiprocessing import Process


def talk(conn):
    while True:
        data = conn.recv(1024).decode('utf-8')
        data = data.upper().encode('utf-8')
        conn.send(data)

if __name__ == '__main__':
    sk = socket.socket()
    sk.bind(('127.0.0.1', 9001))
    sk.listen()

    while True:
        conn, addr = sk.accept()
        Process(target=talk, args=(conn,)).start()

    sk.close()

4.5 通过面向对象的方式开启多进程

# 通过面向对象的方法开启进程
import os
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self,a,b,c):		# 实现init方法才能传参
        self.a = a
        self.b = b
        self.c = c
        super().__init__()
# 不调用父类的init方法,会报AttributeError: 'MyProcess' object has no attribute '_closed'
    def run(self):
        time.sleep(1)
        print(os.getpid(), os.getppid())


if __name__ == '__main__':
    print('本进程pid',os.getpid())
    for i in range(10):
        p = MyProcess(1,2,3)
        p.start()

4.6 process类的其它方法

方法名(属性名) 描述
name 查看进程名字
is_alive() 查看进程是否还活着,返回bool
terminate() 强制杀死一个进程
daemon() 在启动一个进程之前设置为守护进程

五、守护进程

主进程会等待所有的子进程结束,为了回收资源

守护进程会等待主进程的代码执行结束之后再结束,而不是等待整个主进程结束。

守护进程也是子进程,所以要在主进程结束前结束,主进程要给守护进程回收资源,守护进程和其他子进程的执行进度无关。

p = Process(target=func)
p.daemon = True		# 设置p为守护进程
p.start()
p.join()		# 等待其他进程结束才结束

守护进程可以向某个服务端汇报主进程的情况,也可以使用zabbix框架。

六、进程同步 Lock

锁:会降低程序的运行效率,保证了进程之间数据安全的问题。

from multiprocessing import Lock,Process
import time

def func(i, lock):
    lock.acquire()      # 拿钥匙
    print('被锁起来的代码%s'%(i))
    lock.release()      # 还钥匙
    time.sleep(1)

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target=func, args=(i ,lock))
        p.start()

七、进程通信 Queue

# 进程之间通信 IPC Inter Process communication
    # 基于文件:同一台机器上的多个进程之间通信
        # 基于socket的文件级别的通信来完成传递
        # Queue 队列
    # 基于网络:同一台机器或多台机器上的多进程间的通信
        # 第三方工具(消息中间件)
            # memcache(基本不用)
            # redis
            # rabbitmq
            # kafka

紧耦合程序和松耦合程序。

7.1 队列

队列 ipc 进程之间通信 -- 数据安全

基于socket\pickle\Lock实现

pipe管道基于socket\pickle实现的

from multiprocessing import Queue,Process
def son(q):
    q.put('hello')
    

def func(q):
    print(q.get())  # 放进去多少个就取多少个,否则阻塞

if __name__ == '__main__':
    q = Queue()
    Process(target=son,args=(q,)).start()
    Process(target=func,args=(q,)).start()
    # 下面的get()会阻塞住,因为一个put对应一个get,这里的get会等待往队列中放数据再get。
    # # print(q.get())

7.2 生产者消费者模型

1、爬虫

2、分布式操作:celery分布式框架:celery定时分布任务,celery机制

​ 本质:就是让生产数据和消费数据的效率达到平衡并且最大化的效率。

from multiprocessing import Queue,Process
import time
import random

# 多个生产者一个消费者
def consumer(q, name):        # 消费者:通常取到数据之后还要进行某些操作
    # for i in range(10):
    #     print(q.get())
    while True:
        food = q.get()
        if food:
            print('%s吃了%s'%(name, food))
        else:
            break

def producer(q, name, food):        # 生产者:通常在放数据之前就通过某些代码获取数据
    for i in range(10):
        foodi = '%s%s'%(food, i)
        print('%s生产了%s'%(name,foodi))
        time.sleep(random.random())
        q.put(foodi)

if __name__ == '__main__':
    q = Queue()
    c1 = Process(target=consumer, args=(q,'消费者'))
    p1 = Process(target=producer, args=(q,'生产者1', '食物'))
    p2 = Process(target=producer, args=(q,'生产者2', '狗屎'))
    c1.start()
    p1.start()
    p2.start()
    # 等待数据全部放进去之后再put(None)用于终止消费者的循环
    p1.join()
    p2.join()
    q.put(None)

异步阻塞:谁先来执行谁

# 异步阻塞加生产者-消费者模型
import requests
from multiprocessing import Process,Queue
url_dic = {
    'cnblogs':'',
    'baidu':'https://www.baidu.com',   											        'gitee':'https://gitee.com/old_boy_python_stack__22/teaching_plan/issues/IX SRZ',
}

def producer(name,url,q):
    ret = requests.get(url)
    q.put((name,ret.text))

def consumer(q):
    while True:
        tup = q.get()
        if tup is None:break
        with open('%s.html'%tup[0],encoding='utf-8',mode='w') as f:
            f.write(tup[1])

if __name__ == '__main__':
    q = Queue()
    pl = []
    for key in url_dic:
        p = Process(target=producer,args=(key,url_dic[key],q))
        p.start()
        pl.append(p)
    Process(target=consumer,args=(q,)).start()
    for p in pl:p.join()
    q.put(None)

推荐阅读