首页 > 技术文章 > python 核心编程 第十八章

jikeboy 2016-11-09 16:41 原文

python多线程编程

了解多线程

什么时候使用多线程:

  1. 任务是异步的,需要多个并发事务,各个事务的运行顺序可以不确定,随机的。
  2. 编程任务可以分成多个流,每个流一个确定的目标。
  3. 根据应用的不同,这些子任务可能都要计算出一个中间结果,用于合并得到最后的结果。
  4. python对于计算密集型的多线程编程提高速度的效果并不好。

什么是进程:
计算机程序只不过是磁盘中可执行的,二进制(或其它类型)的数据。它们只有在被读取到内
存中,被操作系统调用的时候才开始它们的生命期。进程(有时被称为重量级进程)是程序的一次
执行。每个进程都有自己的地址空间,内存,数据栈以及其它记录其运行轨迹的辅助数据。操作系
统管理在其上运行的所有进程,并为这些进程公平地分配时间。进程也可以通过 fork 和 spawn 操作
来完成其它的任务。不过各个进程有自己的内存空间,数据栈等,所以只能使用进程间通讯(IPC) ,
而不能直接共享信息。

什么是线程:
线程和进程相似,线程运行在同一个进程中,在相同的环境中运行,可共享信息。

python全局解释锁

Python 代码的执行由 Python 虚拟机(也叫解释器主循环)来控制。Python 在设计之初就考虑到
要在主循环中,同时只有一个线程在执行,就像单 CPU 的系统中运行多个进程那样,内存中可以存
放多个程序, 但任意时刻, 只有一个程序在 CPU 中运行。 同样地, 虽然 Python 解释器中可以 “运行”
多个线程,但在任意时刻,只有一个线程在解释器中运行。

在调用外部代码(如 C/C++扩展函数)的时候,GIL 将会被锁定,直到这个函数结束为止(由于
在这期间没有Python的字节码被运行, 所以不会做线程切换) 。 编写扩展的程序员可以主动解锁GIL

python 的threading模块

thread模块提供了基本的线程和锁的支持, 而threading
提供了更高级别,功能更强的线程管理的功能。Queue 模块允许用户创建一个可以用于多个线程之间
共享数据的队列数据结构。

thread模块:
函数 描述
thread 模块函数
start_new_thread(function,
args, kwargs=None) 产生一个新的线程,在新线程中用指定的参数和可选的kwargs 来调用这个函数。
allocate_lock() 分配一个 LockType 类型的锁对象
exit() 让线程退出
LockType 类型锁对象方法
acquire(wait=None) 尝试获取锁对象
locked() 如果获取了锁对象返回 True,否则返回 False
release() 释放锁

threading模块
threading 模块对象 描述
Thread 表示一个线程的执行的对象
Lock 锁原语对象(跟 thread 模块里的锁对象相同)
RLock 可重入锁对象。使单线程可以再次获得已经获得了的锁(递归锁定) 。
Condition 条件变量对象能让一个线程停下来, 等待其它线程满足了某个 “条件” 。如,状态的改变或值的改变。
Event 通用的条件变量。多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活。
Semaphore 为等待锁的线程提供一个类似“等候室”的结构
BoundedSemaphore 与 Semaphore 类似,只是它不允许超过初始值
Timer 与 Thread 相似,只是,它要等待一段时间后才开始运行。

三种使用Thread类创建实例的方法:

  1. 创建一个 Thread 的实例,传给它一个函数
  2. 创建一个 Thread 的实例,传给它一个可调用的类对象
  3. 从 Thread 派生出一个子类,创建一个这个子类的实例

表 18.3 Thread 对象的函数
函数 描述
start() 开始线程的执行
run() 定义线程的功能的函数(一般会被子类重写)
join(timeout=None) 程序挂起,直到线程结束;如果给了 timeout,则最多阻塞 timeout 秒
getName() 返回线程的名字
setName(name) 设置线程的名字
isAlive() 布尔标志,表示这个线程是否还在运行中
isDaemon() 返回线程的 daemon 标志
Edit By Vheavens
Edit By Vheavens
setDaemon(daemonic) 把线程的 daemon 标志设为 daemonic (一定要在调用 start()函数前调用)

例子1[创建一个Thread的实例,传给它一个函数]

import threading
from time import sleep, ctime
loops = [4, 2]

def loop(nloop, nsec):
    print "start loop", nloop, "at:", ctime()
    sleep(nsec)
    print "loop", nloop, "done at:", ctime()

def main():
    print "starting at:", ctime()
    threads = []
    nloops = range(len(loops))
    for i in nloops:
        t = threading.Thread(target=loop, args=(i, loops[i]))
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print "all DONE at:", ctime()

if __name__ == '__main__':
    main()
    
输出:
start at: Sun Nov 06 09:02:49 2016
start loop 0 at:              Sun Nov 06 09:02:49 2016
start loop 1 at:              Sun Nov 06 09:02:49 2016
loop 1 done at:                Sun Nov 06 09:02:51 2016
loop 0 done at:                Sun Nov 06 09:02:53 2016
all DONE at: Sun Nov 06 09:02:53 2016

例子2[创建一个 Thread 的实例,传给它一个可调用的类对象]

import threading
from time import sleep, ctime
loops = [4, 2]

class ThreadFunc(object):
    def __init__(self, func, args, name=''):
        self.name = name
        self.func = func
        self.args = args
    def __call__(self, *args, **kwargs):
        self.res = self.func(*self.args)

def loop(nloop, nsec):
    print "start loop", nloop, "at:             ", ctime()
    sleep(nsec)
    print"loop", nloop, "done at:               ", ctime()

def main():
    print "start at:", ctime()
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print"all DONE at:",ctime()

if __name__ == '__main__':
    main()

例子3[从Thread派生出一个子类,创建一个这个子类的实例]

import threading
from time import sleep, ctime
loops = [4, 2]

class ThreadFunc(threading.Thread):
    def __init__(self, func, args, name=''):
        super(ThreadFunc, self).__init__()
        self.name = name
        self.func = func
        self.args = args
    def run(self):
        self.func(*self.args)

def loop(nloop, nsec):
    print "start loop", nloop, "at:             ", ctime()
    sleep(nsec)
    print"loop", nloop, "done at:               ", ctime()

def main():
    print "start at:", ctime()
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = ThreadFunc(loop, (i, loops[i]), loop.__name__)
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print"all DONE at:",ctime()

if __name__ == '__main__':
    main()

斐波纳挈,阶乘和累加

from time import ctime, sleep
import threading
class MyThread(threading.Thread):
    def __init__(self, func, args, name = ""):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def getResult(self):
        return self.res

    def run(self):
        print "starting ", self.name, "st:      ", ctime()
        self.res = self.func(*self.args)
        print self.name, "finished at:       ", ctime()

def fib(x):
    sleep(0.005)
    if x <2:return 1
    return (fib(x-2)+ fib(x-1))

def fac(x):
    sleep(0.1)
    if x < 2: return 1
    return (x *fac(x-1))

def sum(x):
    sleep(0.1)
    if x < 2:return 1
    return (x +sum(x-1))

funcs = [fib, fac, sum]
n = 12

def main():
    nfuncs = range(len(funcs))

    print "*** SINGLE THREAD"
    for i in nfuncs:
        print "starting", funcs[i].__name__, "at:      ", ctime()
        print funcs[i](n)
        print funcs[i].__name__,"finished at:         ", ctime()
        threads = []
    for i in nfuncs:
        t = MyThread(funcs[i], (n,), funcs[i].__name__)
        threads.append(t)
    for i in nfuncs:
        threads[i].start()
    for i in nfuncs:
        threads[i].join()
        print threads[i].getResult()
    print "all Done"

if __name__ == '__main__':
    main()

生产者-消费者问题和 Queue 模块

主要解决问题是:生产和消费要花费的时间无法确定
Queue 模块函数
queue(size) 创建一个大小为 size 的 Queue 对象
Queue 对象函数
qsize() 返回队列的大小(由于在返回的时候,队列可能会被其它线程修改,所以这个值是近似值)
empty() 如果队列为空返回 True,否则返回 False
full() 如果队列已满返回 True,否则返回 False
put(item,block=0) 把 item 放到队列中,如果给了 block(不为 0) ,函数会一直阻塞到队列中有
空间为止
get(block=0) 从队列中取一个对象,如果给了 block(不为 0) ,函数会一直阻塞到队列中有对象为止

from time import ctime, sleep
import threading
from Queue import Queue
from random import randint

class MyThread(threading.Thread):
    def __init__(self, func, args, name=""):
        super(MyThread, self).__init__()
        self.name = name
        self.func = func
        self.args = args
    def getResult(self):
        return self.res
    def run(self):
        print "starting",  self.name, "at:       ",ctime()
        self.res = self.func(*self.args)
        print self.name,"finished at :           ", ctime()

def writeQ(queue):
    print "producing object for Q..."
    queue.put("xxx", 1)
    print "size now\t", queue.qsize()
def readQ(queue):
    val = queue.get(1)
    print "consumed object from Q..size now\t", queue.qsize()
def writer(queue, loops):
    for i in range(loops):
        writeQ(queue)
        sleep(randint(1,3))
def reader(queue, loops):
    for i in range(loops):
        readQ(queue)
        sleep(randint(2, 5))
funcs = [writer, reader]
nfuncs = range(len(funcs))

def main():
    nloops = randint(2, 5)
    q = Queue(32)
    threads = []
    for i in nfuncs:
        t = MyThread(funcs[i], (q, nloops), funcs[i].__name__)
        threads.append(t)
    for i in nfuncs:
        threads[i].start()
    for i in nfuncs:
        threads[i].join()
    print "all DONE"

if __name__ == '__main__':
    main()
    
输出:
starting writer at:        Tue Nov 08 08:56:28 2016
producing object for Q...
startingsize now	 reader1 
at:        Tue Nov 08 08:56:28 2016
consumed object from Q..size now	0
producing object for Q...
size now	1
consumed object from Q..size now	0
producing object for Q...
size now	1
consumed object from Q..size now	0
producing object for Q...
size now	1
writer finished at :            Tue Nov 08 08:56:35 2016
consumed object from Q..size now	0
reader finished at :            Tue Nov 08 08:56:42 2016
all DONE

练习

18-4. 线程和文件。把练习9-19 的答案做一些改进。我们要得到一个字节值,一个文件名然后显示在文件中那个字节出现了多少次。假设这个文件非常的大。文件是可以有多个读者的,那我们就可以创建多个线程,每个线程负责文件的一部分。最后,把所有的线程的结果相加。使用timeit()对单线程和多线程分别进行计时,对性能的改进进行讨论。

import time
import threading
import re
f = open("hasdoc.txt").read()
def frequency(str, f, option = True):
    start = time.clock()
    str_regex = re.compile(str)
    number = len(str_regex.findall(f))
    end = time.clock()
    if option:
        print end - start
    return number
print frequency("in", f)
def MyThread(str, f, dgree=10):
    start = time.clock()
    f_list = f.split("\n", dgree)
    threads = []
    for i,j in enumerate(f_list):
        t = threading.Thread(target=frequency, args=(str, j, False))
        threads.append(t)
    for i in range(len(f_list)):
        threads[i].start()
    for i in range(len(f_list)):
        threads[i].join()
    end = time.clock()
    print end - start

MyThread("in", f)

最后的结果有些出乎我的意料,可能我的文件不够大,并不能显示出多线程的优势吧。

网络编程的内容暂时跳过

推荐阅读