首页 > 技术文章 > 并发编程:进程池,多线程。

zhangsanfeng 2018-04-26 15:10 原文

多线程相关知识:http://www.cnblogs.com/linhaifeng/articles/7428877.html

 

一 守护进程的应用:

其实还是在我们生产者与消费者的模型上加上守护进程的概念,使得我们的进程能够在任务执行完之后正常的退出。

import time
import random
from multiprocessing import Process,JoinableQueue     #我们在这里导入一个joinableQueue模块,

def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res))
        q.task_done()
def producer(name,q,food):
    for i in range(5):
        time.sleep(random.randint(1,2))
        res='%s%s' %(food,i)
        q.put(res)
        print('\033[45m生产者者===》%s 生产了 %s\033[0m' %(name,res))
if __name__ == '__main__':
    q=JoinableQueue()
    p1=Process(target=producer,args=('egon',q,'包子'))
    p2=Process(target=producer,args=('刘清政',q,'泔水'))
    p3=Process(target=producer,args=('杨军',q,'米饭'))
    c1=Process(target=consumer,args=('alex',q))
    c2=Process(target=consumer,args=('梁书东',q))
    c1.daemon=True
    c2.daemon=True
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()


    # 确定生产者确确实实已经生产完毕
    p1.join()
    p2.join()
    p3.join()
    # 在生产者生产完毕后,拿到队列中元素的总个数,然后直到元素总数变为0,q.join()这一行代码才算运行完毕
    q.join()
    # q.join()#一旦结束就意味着队列确实被取空,消费者已经确确实实把数据都取干净了
    print('主进程结束')
在所有的子进程结束之后,才会执行print(‘主进程结束’)

守护进程在多进程与多线程之间的区别:

# 守护:在进程与线程中的区别。:只要这个进程中没有可执行的代码。那么守护进程就结束。
# 在进程中,我们之前讲的主进程中只有一条主线程,所以在主进程(也就是主线程代码执行完毕后)执行完毕后
# 守护进程就结束。
# 那么在线程中:主线程结束之后还有非守护子线程,代码需要运行,而主线程会等着子线程的运行结束而结束,
# 也就是说这个进程中还有子线程没有运行完。所以会等到所有的非守护子进程进程都结束之后守护进程才会结束。

 下面我们通过一个实例来说明区别:

from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")
输出结果为:
123
456
main
end123
end456
如果我们把sleep(1)换成sleep(5)也就是说让foo在主线程结束的时候还在sleep,那么就不会打印end123

 

二  进程池:

我们在使用Python的过程中,多进程是我们实现并发的手段之一,但是有几点问题需要注意:

  1. 很明显需要并发执行的任务通常要远大于核数
  2. 一个操作系统不可能无限开启进程,通常有几个核就开几个进程
  3. 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)

例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数... 
ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

    创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程

三 多线程:

那么我们要知道什么是线程:指的是一条流水线工作的过程。

其实前面我们说的进程并不是执行单位,进程实际上就是一个资源单位,真正执行的是它里面的线程。一个进程自带线程,在默认情况下自带一个主线程。线程才是在CPU上的执行单位。比如我们一个工厂,每开启一个车间就相当于开启一个进程,车间与车间之间的资源是相互独立的,那么同一个车间的多条流水线之间的资源是共享的。

我们之前说每开启一个进程其实是向操作系统发送一个请求,开辟一块内存空间,把代码以及数据copy进去,然后CPU再到内存空间中取,执行代码。那么线程的开启要比进程简单的多,几乎在发出请求的同时就能把一个‘’子线程‘’(为什么这里加引号呢?因为本身线程之间是没有主与子的,线程之间是地位相等的)造好。不用再重新申请内存空间。

  所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。

  多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。

      1  比较进程与线程之间的差距:

           1)统一进程内的线程们共享该进程内的资源,不同进程内的线程资源可肯定是隔离的。

           2)创建线程的开销比创建进程要小的多

          3)线程之间没有主次

          4)主线程与主进程在等待子线程与子进程的区别:

               主线程等子线程结束是为了保证所有的子线程结束后再回收整个进程的资源。而主进程等待子进程结束是为了回收子进程的资源,线程之间是资源共享的,而进程之间是彼此相互隔离的。

下面我们看一个线程的实例:

from threading import Thread

def task(name):
    print('%s is running'%name)

if __name__ == '__main__':
    t=Thread(target=task,args=('feng',))
    t.start()
    print(''.center(50,'*'))
# 运行结果为:
# feng is running
************************主*************************
我们可以看出线程的产生是非常迅速的。

四 死锁与递归锁:

from threading import Thread,Lock,RLock
import time
# mutexA=Lock()
# mutexB=Lock()
mutexA=mutexB=RLock()
class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()
    def f1(self):
        mutexA.acquire()
        print('%s 拿到了A锁' %self.name)#拿到A锁就在A锁上加一
        mutexB.acquire()
        print('%s 拿到了B锁' %self.name)
        mutexB.release()
        mutexA.release()
    def f2(self):
        mutexB.acquire()
        print('%s 拿到了B锁' %self.name)
        time.sleep(0.1)
        mutexA.acquire()
        print('%s 拿到了A锁' %self.name)
        mutexA.release()
        mutexB.release()
if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
    # t1=MyThread()
    # t1.start()
    #
    # t2=MyThread()
    # t2.start()
    #
    # t3=MyThread()
    # t3.start()
    print('')

那么关于信号量与互斥锁的详细信息:一些底层原理。

http://url.cn/5DMsS9r

信号量与互斥锁的应用原理:

互斥锁:一个防止他人进入的简单方法,就是门口加一把锁。先到的人锁上门,后到的人看到上锁,就在门口排队,等锁打开再进去。
这就叫"互斥锁"(Mutual exclusion,缩写 Mutex),防止多个线程同时读写某一块内存区域。 信号量:还有些房间,可以同时容纳n个人,比如厨房。也就是说,如果人数大于n,多出来的人只能在外面等着。这好比某些内存区域,只能供给固定数目的线程使用。
这时的解决方法,就是在门口挂n把钥匙。进去的人就取一把钥匙,出来时再把钥匙挂回原处。后到的人发现钥匙架空了,就知道必须在门口排队等着了。
这种做法叫做"信号量"(Semaphore),用来保证多个线程不会互相冲突。 不难看出,mutex是semaphore的一种特殊情况(n=1时)。也就是说,完全可以用后者替代前者。但是,因为mutex较为简单,且效率高,

所以在必须保证资源独占的情况下,还是采用这种设计。

 互斥锁与join深度解析:

#不加锁:并发执行,速度快,数据不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''


#不加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加锁的代码并发运行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加锁的代码串行运行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

#有的同学可能有疑问:既然加锁会让运行变成串行,那么我在start之后立即使用join,就不用加锁了啊,也是串行的效果啊
#没错:在start之后立刻使用jion,肯定会将100个任务的执行变成串行,毫无疑问,最终n的结果也肯定是0,是安全的,但问题是
#start后立即join:任务内的所有代码都是串行执行的,而加锁,只是加锁的部分即修改共享数据的部分是串行的
#单从保证数据安全方面,二者都可以实现,但很明显是加锁的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗时是多么的恐怖
'''
互斥锁与join区别

 

 

 

 


 

推荐阅读