multiprocessing模块支持本地和远程并发,并且通过多进程的方式成功避免 Global Interpreter Lock (GIL),因此该模块可以充分利用多处理器。
1. Process类
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
这个类表示运行在一个子进程中的任务。
target:是当进程启动时执行的被调函数。
args:传递给target的参数元组。
name:进程的名字,字符串。
Process的实例p具有一下方法:
(1) p.is_alive()
如果p仍然在运行,返回True.
(2) p.run()
进程启动时运行的方法。默认情况下,会调用传递给Process构造函数的target。定义进程的另一种方法是从Process类继承并重新实现run()函数
(3) p.start()
启动进程。这将运行代表进程的子进程,并调用该子进程中的p.run()函数
(4) p.terminate()
强制终止进程。如果调用此函数,进程p将被立即终止,同时不会进行任何清理动作。如果进程p创建了它自己的子进程,这些进程将会变为僵尸进程。
(5) p.daemon
一个布尔标志,指示进程是否是后台进程。当创建它的进程终止时,daemon为True的进程将自动终止。与线程不同的是,当daemon为False时,进程会等待子进程结束,然而Ctrl-C仍然有效。
(6) p.join([timeout])
等待进程p终止。timeout时可选的超时期限。进程可以被连接无数次,但如果连接自身则会出错。同样与线程不同,主进程等待子进程期间,Ctrl-C仍然有效
(7) p.name
进程的名字
(8) p.pid
进程的整数进程id
2. 进程间通信
multiprocessing提供两种进程间通信方式:Queues和Pipes。避免在进程间使用同步锁
(1)Queues
Queues类是 Queue.Queue 的相似克隆,该类是线程以及进程安全的。
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints "[42, None, 'hello']"; wait, if q is empty p.join()
Queues类返回的进程共享队列是通过pipe和一些锁/信号量实现的。当一个进程第一次将一个对象放入队列中时,一个feeder线程将会启动,它将该对象从buffer中转移到pipe中。
multiprocessing利用 Queue.Empty异常 和 Queue.Full异常来发出超时信号,注意要引入Queue包
Queue类实现了Queue.Queue类中的所有方法,除了task_done() 和 join()
Queue类实现的方法介绍:
- q.qsize()、q.empty()、和q.full()的返回值不可靠,不要使用
- q.put(obj[, block[, timeout]]) 将对象插入到队列中。如果block是True(默认)并且timeout是None(默认),那么进程阻塞直到队列中出现空闲位置;如果timeout是一个正数,那么进程至多阻塞timeout秒,当timeout秒后抛出Queue.Full异常。如果block是False,则忽略timeout,当队列有空闲位置时则插入对象,否则立即抛出Queue.Full异常。
- put_nowait(obj) 等价于put(obj, False)
- get([block[, timeout]]) 从队列中取出一个对象并返回该对象。如果block是True(默认)并且timeout是None(默认),那么阻塞进程知道队列中有一个可用对象;如果timeout是一个正数,进程至多阻塞timeout秒,当timeout秒后抛出Queue.Empty异常。如果block是False,则忽略timeout,当队列中有可用对象是则取出该对象,否则立即抛出Queue.Empty异常。
- get_nowait() 等价于get(False)
class multiprocessing.queues.SimpleQueue 是一个简化的Queue类型,它实现的方法如下:
- empty()当队列为空时返回True,否则返回False。
- get() 从队列中取出一项数据
- put(item)向队列中插入一项数据
class multiprocessing.JoinableQueue([maxsize]) 是Queue的子类,实现了task_done()和join():
- task_done()表示之前队列中的一项已经被处理。该函数应该被数据接收方调用。当接收方调用get()接收了队列中的一个数据后,可以调用task_done()以告知队列已接收的数据已经被处理。如果有一个调用join()的进程处于被阻塞状态,当队列中所有项都已经被处理,则该进程可以继续执行(即对于每一个被put()到队列中的项都已经被接收,且接收进程都调用了task_done())。如果task_done()被调用的次数大于被put()到队列中的项数,则抛出 ValueError异常。
- join() 阻塞调用join()的进程,直到队列中的所有项都已经被接收,且都已经被处理。当有项被put()到队列中时,则未完成的项数增加;当接收项的进程调用task_done()后,则未完成的项数减少。当未完成的项数减小至0,则调用join()的被阻塞进程可以继续执行。
(2)Pipes
Pipe([duplex])函数返回一个包含两个Connection对象的元组(conn1, conn2),这两个对象分别代表pipe的两端。如果duplex是True(默认),则代表这个pipe是双向的;如果duplex是False,则代表pipe是单向的:conn1仅能接收数据,conn2仅能发送数据。
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print parent_conn.recv() # prints "[42, None, 'hello']" p.join()
class multiprocessing.Connection实现了以下方法:
- send(obj) 发送数据,另一端应该用recv()接收。被传输的对象必须可被压缩,但压缩后的大小也不能过大,(大约32M+,根据操作系统的不同会有区别),否则会抛出 ValueError 异常。
- recv()接收send()发送的数据。如果没有可接收的数据则阻塞。当另一端已经关闭并且已经没有可接收数据时,则抛出EOFError异常。
- fileno() 返回这个connection的文件描述符或句柄。
- close() 关闭这个connection。当connection被回收时,系统会自动调用这个函数
- poll([timeout]) 检查pipe中是否还有可读数据,有则返回True;否则返回False。如果没有指定timeout,函数将立刻返回。如果timeout是一个正数,则timeout代表阻塞的最长秒数,如果在timeout之内pipe中有了可读数据,则返回True。如果timeout是None,则代表给timeout赋予了一个无穷大的数,如果在阻塞期间pipe中有了可读数据,则返回True。
- send_bytes(buffer[, offset[, size]])以字节形式发送一个对象,该对象需要支持buffer接口。如果指定了offset,则数据从offset开始发送。如果指定了size,则会发送size个字节。注意必须满足(buffer length >= offset + size),否则会抛出异常。如果buffer过大(大约32MB+,取决与操作系统),则会抛出 ValueError异常。
- recv_bytes([maxlength]) 以字符串的形式返回另一端发送的字节数据。如果没有可以接收的数据则阻塞。当另一端已经关闭并且已经没有可接收数据时,则抛出EOFError异常。如果指定了maxlength,并且maxlength小于待接收数据的长度,则抛出IOError异常,并且无法再使用该connection对象接收数据。
- recv_bytes_into(buffer[, offset])将待接收数据读入一个buffer中,并且返回读入数据的长度。如果没有可以接收的数据则阻塞。当另一端已经关闭并且已经没有可接收数据时,则抛出EOFError异常。buffer必须是一个可写的buffer接口。如果指定了offset,则数据从offset的位置开始写入buffer,offset必须是非负整数并且小于buffer的长度。如果buffer长度过小,则抛出BufferTooShort异常,此时,完整的待接收数据可以通过 e.args[0]获取,其中e是异常的实例。
>>> from multiprocessing import Pipe >>> a, b = Pipe() >>> a.send([1, 'hello', None]) >>> b.recv() [1, 'hello', None] >>> b.send_bytes('thank you') >>> a.recv_bytes() 'thank you' >>> import array >>> arr1 = array.array('i', range(5)) >>> arr2 = array.array('i', [0] * 10) >>> a.send_bytes(arr1) >>> count = b.recv_bytes_into(arr2) >>> assert count == len(arr1) * arr1.itemsize >>> arr2 array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
注意,当一个进程准备读写一个pipe时被杀死,此时pipe中的数据崩溃,因为无法获取该数据的边界位置。
3. multiprocessing实现的工具函数
(1)multiprocessing.active_children()
返回当前进程的所有活的子进程列表(不包括自己)。调用这个函数的副作用是,会join那些已经结束的子进程。
(2)multiprocessing.cpu_count()
返回系统中CPU的数量。可能引发 NotImplementedError异常
(3)multiprocessing.current_process()
返回当前进程对象