首页 > 技术文章 > <六>进程、线程、协程

shikaishikai 2018-08-06 22:32 原文

<1>进程与线程

进程:系统为程序执行分配的资源,是资源的集合,是程序的一次执行活动

线程:实际就是系统调度的一堆指令,是进程中的实际运作单位。一个进程中可以并发多个线程,每条线程并行执行不同的任务。

区别:①线程共享创建它的进程的地址空间;进程有自己的地址空间

   ②线程直接访问数据段的过程;进程有自己的复制父进程的数据段

      ③线程可以直接与其他线程交流;进程必须使用进程间通信和同胞交流过程

      ④主线程可能会影响子线程的功能,各进程之间互不影响、

形象比喻:进程是房间,线程是房间里的人;两者无法比较运行快慢,但启动速度是线程快

Python threading模块

格式一:直接调用

  join()方法:等待每一个线程完成

  setDaemon()方法:把当前进程设置为守护进程(当主进程推出后整个进程结束)

#!_*_coding:utf-8_*_
#__author__:"shikai"
import threading,time
#第一种写法
t_obj=[]
def run(n):
    print("text:",n)
    time.sleep(2)
    print("text done....",n)
start_time=time.time()
for i in range(50):
    t=threading.Thread(target=run,args=("t-{}".format(i),))  #实例化子线程
    t.setDaemon(True) #把当前进程设置为守护进程(当主进程推出后整个进程结束)
    t.start()
    t_obj.append(t)  #为了join()不阻塞后面程序
# for i in t_obj:
#     t.join() #=wait()   #每一个线程结束
#判断主进程和活跃进程
print("main thread done...",threading.current_thread(),threading.active_count())
print("time:",time.time()-start_time)

格式二:类继承方式

#!_*_coding:utf-8_*_
#__author__:"shikai"
import threading,time
#第二种类的写法
class MyThread(threading.Thread):
    def __init__(self,n):
        super(MyThread,self).__init__()
        self.n=n
    def run(self):
        print("text:", self.n)
        time.sleep(2)
        print("text done.....",self.n)
start_time=time.time()
t1=MyThread("t1")
t2=MyThread("t2")
t1.start()
t2.start()
t1.join()   #  等待t1线程执行完毕
t2.join()  #   等待t1线程执行完毕
print("main thread done ....")
print("time:",time.time()-start_time)

<2>线程锁(互斥锁Mutex)

多个线程可以共享同一份数据,就会产生一份数据还没修改完另一个线程就来取这份数据,导致最后数据不正确,添加线程锁使之成为串行,保证一份数据同时间只能由一个线程处理。

#!_*_coding:utf-8_*_
#__author__:"shikai"
import time,threading
num=0
def run(n):
    lock.acquire()    #lock=串行线程  让一个线程执行完之后在执行另一个线程
    global num
    num+=1
    print(num)
    time.sleep(1)
    lock.release()     #锁释放
lock = threading.Lock()       #实例化线程锁
t_obj=[]
for i in range(5):
    t=threading.Thread(target=run,args=("t-{}".format(i),))
    #t.setDaemon(True) #把当前进程设置为守护进程(当主进程推出后整个进程结束)
    t.start()
    t_obj.append(t)  #为了join()不阻塞后面程序
for i in t_obj:
    t.join() #=wait()   #每一个线程结束
print("main thread done ....")

<3>RLock(递归锁)

 就是在大锁中加入小锁(比如进入两道门不要两把不同的锁)

#!_*_coding:utf-8_*_
#__author__:"shikai"
import threading, time
def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num
def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2
def run3():
    lock.acquire()   
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)
if __name__ == '__main__':
    num, num2 = 0, 0
    lock = threading.RLock()  #递归锁
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()

<4>Semaphore(信号量)

Semaphore是同时允许一定数量的线程更改数据,设置能同时运行的线程数。例如一个厕所有3个坑位(就是信号量),允许3个人同时运行

#!_*_coding:utf-8_*_
#__author__:"shikai"
#信号量--容量一定线程数同时执行
import time,threading
num=0
def run(n):
    semaphore.acquire()    #  设置锁
    global num
    num+=1
    print(num)
    time.sleep(1)
    semaphore.release()   #释放锁
semaphore = threading.BoundedSemaphore(5)   #最大允许5个线程
t_obj=[]
for i in range(50):
    t=threading.Thread(target=run,args=("t-{}".format(i),))  #把run函数变成子线程
    #t.setDaemon(True) #把当前进程设置为守护进程(当主进程推出后整个进程结束)
    t.start()
    t_obj.append(t)  #为了join()不阻塞后面程序
for i in t_obj:
    t.join() #=wait()   #每一个线程结束
print("main thread done ....")

<5>Events

通过Event来实现两个或多个线程间的交互,例如:车是一个线程,红绿灯是一个线程,两者通过event标志位来判断状态。

#!_*_coding:utf-8_*_
#__author__:"shikai"
import threading,time
event=threading.Event()
def light():
    count = 0
    event.set()  # 标志位建立
    while True:
        if count>4 and count<10:  #  此时为红灯状态
            event.clear()  #清空标志位
            print("\033[41;1mred light is on...\033[0m")
        elif count>10:
            event.set()    #建立绿灯标志位
            count=0
        else:
            print("\033[42;1m green light is on ....\033[0m")
        time.sleep(1)
        count+=1
def car(name):
    while True:
        if event.is_set():  # 此时在等红灯 (判断标志位状态)
            print("{} is running...".format(name))
            time.sleep(1)
        else:
            print("{} is waiting red light..".format(name))
            event.wait()
            print("ok red ")

lighter=threading.Thread(target=light,)
lighter.start()
car1=threading.Thread(target=car,args=("奥迪",))
car1.start()

<6>queue队列

#优点:1:解耦—使程序直接实现松耦合
# 2:提高处理效率
queue.Queue(maxsize=0) #先入先出
queue.LifoQueue() #像栈一样后进先出  可设置maxsize=0
queue.PriorityQueue()  #存储数据时可设置优先级  可设置maxsize=0
#!_*_coding:utf-8_*_
#__author__:"shikai"
import queue
#优点:1:解耦—使程序直接实现松耦合
#      2:提高处理效率

q1=queue.Queue()  #先入先出 可设置maxsize=0
q1.put(1)
q1.put(2)
q1.put(3)
print(q1.get())
print(q1.get())
print(q1.get())

q2=queue.LifoQueue() #像栈一样后进先出  可设置maxsize=0
q2.put(1)
q2.put(2)
q2.put(3)
print(q2.get())
print(q2.get())
print(q2.get())

q3=queue.PriorityQueue()  #存储数据时可设置优先级  可设置maxsize=0
q3.put((1,"aaaa"))
q3.put((5,"bbbb"))
q3.put((3,"ccc"))
print(q3.get())
print(q3.get())
print(q3.get())

队列应用:生产者和消费者模型

#!_*_coding:utf-8_*_
#__author__:"shikai"
import time,threading,queue
q=queue.Queue(maxsize=10)
def producer(name):
    num = 1
    while True:
        q.put("骨头{}".format(num))
        print("{} 开始生产{}。。。".format(name,num))
        num+=1
        time.sleep(1)
def consumer(name):
    while True:
        print("{} 开始吃{}".format(name,q.get()))
        time.sleep(2)

producer=threading.Thread(target=producer,args=("shikai",))
consumer=threading.Thread(target=consumer,args=("kkk",))
producer.start()
consumer.start()

进程multiprocessing

多进程是一个包,它支持产卵过程使用一个API类似线程模块。多进程方案提供了本地和远程的并发性,有效地避开使用子流程而不是全局解释器锁的线程。因此,多进程模块允许程序员充分利用多个处理器在一个给定的机器。它运行在Unix和Windows。
注意:#args后面以元组格式 ,还有逗号

<1> Queue

实现进程间的数据交流
#!_*_coding:utf-8_*_
#__author__:"shikai"
from multiprocessing import Process,Queue #进程queue
def f(qq):
    qq.put("hello 子进程")
if __name__=="__main__":
    q=Queue()
    m=Process(target=f,args=(q,))   #实际上是父进程把队列复制给子进程  实现数据的传递
    m.start()
    print(q.get())

<2> Pipes

实现进程间的通讯,像socket一样

#!_*_coding:utf-8_*_
#__author__:"shikai"
from multiprocessing import Process,Pipe
#Pipe 实现两个进程间的通讯
def child(conn):
    t=conn.recv()    #接收父进程传递的消息
    print("from parrent:",t)
    conn.send("hello parrent")   #发送
    conn.close()

if __name__=="__main__":
    parrent_conn,child_conn=Pipe()    #实例化
    m=Process(target=child,args=(child_conn,))   #开启多进程实现父进程和子进程的通讯
    m.start()
    parrent_conn.send("hello child!")    
    t=parrent_conn.recv()
    print("from child:",t)
    m.join() 

<3> Manager

实现多个进程间的数据共享(manager默认加了锁一份数据同时只能一个进程修改,但各进程都可以修改)

#!_*_coding:utf-8_*_
#__author__:"shikai"
from multiprocessing import Process,Manager
#实现多个进程间的数据共享(manager默认加了锁一份数据同时只能一个进程修改,但各进程都可以修改)
import  os
def child(d,l):
    d["name"]='shikai'
    #l=[os.getpid()]
    l.append(os.getpid())
    print(l,d)

if __name__=="__main__":
    with Manager() as manager:
        d=manager.dict()     #生成一个字典,可在多个进程传递和共享  把字典和列表复制给每个进程
        l=manager.list()     #生成一个列表,可在多个进程传递和共享  把字典和列表复制给每个进程
        res_list=[]
        for i in range(5):
            m=Process(target=child,args=(d,l))  #把d l 传给子进程
            m.start()
            res_list.append(m)
        for res in res_list:  #等待每个进程结束
            res.join()
        print(d)
        print(l)

<四> Pool 进程池

和线程的信号量类似,允许同时执行进程数。

pool.apply(func=child,args=(i,))                                      #串行进程
pool.apply_async(func=child,args=(i,),callback=bar)     #多个进程同时执行,callback回调 等待child函数执行完之后在执行bar 函数

#!_*_coding:utf-8_*_
#__author__:"shikai"
from multiprocessing import Process,Pool
import os,time
def child(n):
    time.sleep(2)
    print("in the processing:",os.getpid())  #获得进程号id

def bar(n):         #主线程调用bar函数
    print("hello")

if __name__=="__main__":
    pool=Pool(3)        #实例化5个容量的进程池
    for i in range(10):   #开启10个进程
        #pool.apply(func=child,args=(i,))  #串行效果
        pool.apply_async(func=child,args=(i,),callback=bar)  #回调 等待child函数执行完之后在执行bar 函数
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

 

协程 

协程,又称微线程,协程是一种用户态的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在各协程来回切换的时候,恢复先前保存的寄存器上下文和栈,从而回到上次操作时的状态。

协程的优点:

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

<1> greenlet模块

实例化协程对象之后,手动switch()方法,即可以来回切换协程

#!_*_coding:utf-8_*_
#__author__:"shikai"

from greenlet import greenlet
#手动切换协程
def text1():
    print(11)
    gre2.switch()
    print(33)
    gre2.switch()
def text2():
    print(22)
    gre1.switch()
    print(44)
gre1=greenlet(text1)   #启动一个协程
gre2=greenlet(text2)
gre1.switch()   #开始运行

< 2> gevent模块

Gevent是一种基于协程的Python网络库,它用到Greenlet提供的,封装了libevent事件循环的高层同步API。它让开发者在不改变编程习惯的同时,用同步的方式写异步I/O的代码。使用Gevent的性能确实要比用传统的线程高,甚至高很多。

#!_*_coding:utf-8_*_
#__author__:"shikai"
import gevent
#遇到IO就切换  优点是节约单线程执行时间(花费子函数最大sleep时间)
def func1():
    print("in the func1")           #  >>: 1
    gevent.sleep(3)
    print("in the func1 again")    #  >>: 6
def func2():
    print("in the func2")            #  >>: 2
    gevent.sleep(2)
    print("in the func2 again")      #  >>: 5
def func3():
    print("in the func3")            #  >>: 3
    gevent.sleep(1)
    print("in the func3 again")      #  >>: 4

gevent.joinall([
    gevent.spawn(func1),   #生成协程
    gevent.spawn(func2),
    gevent.spawn(func3),
])

通过gevent实现多个socket并发,monkey()方法把当前程序所有的IO操作都做好标记出来

server端:

#!_*_coding:utf-8_*_
#__author__:"shikai"
import gevent
import socket
import time

from gevent import socket,monkey
monkey.patch_all()

def server(ID,addr):
    s=socket.socket()
    s.bind((ID,addr))
    s.listen(500)
    while True:
        conn,addr=s.accept()
        gevent.spawn(hand_requst,conn)    #多并发多个conn

def hand_requst(conn):
    try:
        while True:
            data=conn.recv(1024)
            print("recv data:",data)
            conn.send(data)
            if not data:
                break
    except Exception as e:
        print(e)
    finally:
        conn.close()
if __name__=="__main__":
    server("localhost",1234)

client端:

#!_*_coding:utf-8_*_
#__author__:"shikai"
import socket
s=socket.socket()
s.connect(("localhost",1234))  #以元组形式给与参数
while True:
    msg=input(">>:")
    s.send(msg.encode())
    data=s.recv(1024)
    print("recv data:",data.decode())
s.close()

gevent与爬虫

#!_*_coding:utf-8_*_
#__author__:"shikai"
from urllib import request
import gevent,time
from gevent import monkey
monkey.patch_all()  #把当前程序所有的IO操作都做好标记出来

def func(url):
    print("GET:{}".format(url))
    resp=request.urlopen(url)
    data=resp.read()        #读取出url数据
    print("{} bytes received from: {}".format(len(data),url))
urls = ['https://www.python.org/',
        'https://www.yahoo.com/',
        'https://github.com/' ]
time_start=time.time()
for url in urls:
    func(url)
print('同步cost:',time.time()-time_start)

#异步操作
async_time=time.time()
gevent.joinall([
    gevent.spawn(func,"https://www.python.org/"),
    gevent.spawn(func, "https://www.yahoo.com/"),
    gevent.spawn(func, "https://github.com/")
])
print("异步cost:",time.time()-async_time)

<3>select 

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

 

Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。

select 多并发socket(server端):

#!_*_coding:utf-8_*_
#__author__:"shikai"
import select,socket
import queue
#select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误
server=socket.socket()
server.bind(("localhost",1234))
server.listen()

server.setblocking(False)   #变成非阻塞状态
input_list=[server]   #第一个默认服务器端自己,新接收的连接
output_list=[]  # 要返回数据的连接
msg_dic={}
while True:
    readable,writeable,exceptional=select.select(input_list,output_list,input_list)   #监控input_list里有没有可连接的对象
    for r in readable:    #循环rlist新连接对象
        if r is server:
            conn,addr=server.accept()   #开始接收新连接
            input_list.append(conn)
            #新连接现在还没发送数据,如果现在接收就会报错,要想server知道客户端打的发没发数据
            #还需要select 再监控一次
            msg_dic[conn]=queue.Queue()   #s实例化一个队列接收数据
        else:  #此时遍历出的是新连接
            data=r.recv(1024)   #接收数据
            print("new conn:", conn)
            print("recv data:",data)
            print("--------------------------------")
            msg_dic[r].put(data)     #把要返回的数据添加到队列里
            output_list.append(r)     #把要返回数据的连接添加到select监控的列表
    for w in writeable:
        data_to_client=msg_dic[w].get()    #把数据从队列里get出来
        w.send(data)         #f发送数据给客户端
        output_list.remove(w)   #确保下次发送数据时没有该连接
    for e in exceptional:   #连接断开
        input_list.remove(e)
        del msg_dic[e]
        if e in output_list:
            output_list.remove(e)

server.close()

 

selectors 模块

它的功能与linux的epoll,还是select模块,poll等类似;实现高效的I/O multiplexing,  常用于非阻塞的socket的编程中。

selectors 多并发socket(server端):

#!_*_coding:utf-8_*_
#__author__:"shikai"
import socket,selectors
sel=selectors.DefaultSelector()  #实例化selectors对象
def accept(sock,mask):   #接收sock
    conn,addr=sock.accept()
    print("mask1:",mask)
    sel.register(conn,selectors.EVENT_READ,read)   #注册可read的conn对象执行read函数

def read(conn,mask):   #对conn执行接收和发送数据
    data=conn.recv(1024)
    if data:
        print("mask2:",mask)    # mask=1
        #print("recv data:",data)
        conn.send(data)
    else:
        sel.unregister(conn)   #从sel对象中注销断开连接的conn
        conn.close()
sock = socket.socket()
sock.bind(("localhost", 1234))
sock.listen()
sock.setblocking(False)  # 设置服务器端为非阻塞模式
sel.register(sock, selectors.EVENT_READ, accept)  # 注册一个可监听,read的socket对象

while True:
    event=sel.select()   #默认阻塞,有活动的连接就返回到活动列表
    for key,mask in event:
        calllback=key.data   #=accept函数
        calllback(key.fileobj,mask)   # fileobj=表示已经注册的文件对象

  

 

 

 




推荐阅读