首页 > 技术文章 > python库(5)—— multiprocessing

zhangjing327 2013-12-30 18:46 原文

multiprocessing模块支持本地和远程并发,并且通过多进程的方式成功避免 Global Interpreter Lock (GIL),因此该模块可以充分利用多处理器。
1. Process类
  class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
  这个类表示运行在一个子进程中的任务。
  target:是当进程启动时执行的被调函数。
  args:传递给target的参数元组。
  name:进程的名字,字符串。
  Process的实例p具有一下方法:
  (1) p.is_alive()
    如果p仍然在运行,返回True.
  (2) p.run()
    进程启动时运行的方法。默认情况下,会调用传递给Process构造函数的target。定义进程的另一种方法是从Process类继承并重新实现run()函数
  (3) p.start()
    启动进程。这将运行代表进程的子进程,并调用该子进程中的p.run()函数
  (4) p.terminate()
    强制终止进程。如果调用此函数,进程p将被立即终止,同时不会进行任何清理动作。如果进程p创建了它自己的子进程,这些进程将会变为僵尸进程。
  (5) p.daemon
    一个布尔标志,指示进程是否是后台进程。当创建它的进程终止时,daemon为True的进程将自动终止。与线程不同的是,当daemon为False时,进程会等待子进程结束,然而Ctrl-C仍然有效。
  (6) p.join([timeout])
    等待进程p终止。timeout时可选的超时期限。进程可以被连接无数次,但如果连接自身则会出错。同样与线程不同,主进程等待子进程期间,Ctrl-C仍然有效
  (7) p.name
    进程的名字
  (8) p.pid
    进程的整数进程id

 

2. 进程间通信

  multiprocessing提供两种进程间通信方式:Queues和Pipes。避免在进程间使用同步锁

(1)Queues

  Queues类是 Queue.Queue 的相似克隆,该类是线程以及进程安全的。

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, 'hello']"; wait, if q is empty
    p.join()

  Queues类返回的进程共享队列是通过pipe和一些锁/信号量实现的。当一个进程第一次将一个对象放入队列中时,一个feeder线程将会启动,它将该对象从buffer中转移到pipe中。

  multiprocessing利用 Queue.Empty异常 和 Queue.Full异常来发出超时信号,注意要引入Queue包

  Queue类实现了Queue.Queue类中的所有方法,除了task_done() 和 join()

  Queue类实现的方法介绍:

  • q.qsize()、q.empty()、和q.full()的返回值不可靠,不要使用
  • q.put(obj[block[timeout]]) 将对象插入到队列中。如果block是True(默认)并且timeout是None(默认),那么进程阻塞直到队列中出现空闲位置;如果timeout是一个正数,那么进程至多阻塞timeout秒,当timeout秒后抛出Queue.Full异常。如果block是False,则忽略timeout,当队列有空闲位置时则插入对象,否则立即抛出Queue.Full异常。
  • put_nowait(obj) 等价于put(obj, False)
  • get([block[timeout]]) 从队列中取出一个对象并返回该对象。如果block是True(默认)并且timeout是None(默认),那么阻塞进程知道队列中有一个可用对象;如果timeout是一个正数,进程至多阻塞timeout秒,当timeout秒后抛出Queue.Empty异常。如果block是False,则忽略timeout,当队列中有可用对象是则取出该对象,否则立即抛出Queue.Empty异常。
  • get_nowait() 等价于get(False)

  class multiprocessing.queues.SimpleQueue 是一个简化的Queue类型,它实现的方法如下:

  • empty()当队列为空时返回True,否则返回False。
  • get() 从队列中取出一项数据
  • put(item)向队列中插入一项数据

  class multiprocessing.JoinableQueue([maxsize]是Queue的子类,实现了task_done()和join():

  • task_done()表示之前队列中的一项已经被处理。该函数应该被数据接收方调用。当接收方调用get()接收了队列中的一个数据后,可以调用task_done()以告知队列已接收的数据已经被处理。如果有一个调用join()的进程处于被阻塞状态,当队列中所有项都已经被处理,则该进程可以继续执行(即对于每一个被put()到队列中的项都已经被接收,且接收进程都调用了task_done())。如果task_done()被调用的次数大于被put()到队列中的项数,则抛出 ValueError异常。
  • join() 阻塞调用join()的进程,直到队列中的所有项都已经被接收,且都已经被处理。当有项被put()到队列中时,则未完成的项数增加;当接收项的进程调用task_done()后,则未完成的项数减少。当未完成的项数减小至0,则调用join()的被阻塞进程可以继续执行。

(2)Pipes

  Pipe([duplex])函数返回一个包含两个Connection对象的元组(conn1, conn2),这两个对象分别代表pipe的两端。如果duplex是True(默认),则代表这个pipe是双向的;如果duplex是False,则代表pipe是单向的:conn1仅能接收数据,conn2仅能发送数据。

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print parent_conn.recv()   # prints "[42, None, 'hello']"
    p.join()

  class multiprocessing.Connection实现了以下方法:

  • send(obj发送数据,另一端应该用recv()接收。被传输的对象必须可被压缩,但压缩后的大小也不能过大,(大约32M+,根据操作系统的不同会有区别),否则会抛出 ValueError 异常。
  • recv()接收send()发送的数据。如果没有可接收的数据则阻塞。当另一端已经关闭并且已经没有可接收数据时,则抛出EOFError异常。
  • fileno(返回这个connection的文件描述符或句柄。
  • close(关闭这个connection。当connection被回收时,系统会自动调用这个函数
  • poll([timeout]检查pipe中是否还有可读数据,有则返回True;否则返回False。如果没有指定timeout,函数将立刻返回。如果timeout是一个正数,则timeout代表阻塞的最长秒数,如果在timeout之内pipe中有了可读数据,则返回True。如果timeout是None,则代表给timeout赋予了一个无穷大的数,如果在阻塞期间pipe中有了可读数据,则返回True。
  • send_bytes(buffer[offset[size]])以字节形式发送一个对象,该对象需要支持buffer接口。如果指定了offset,则数据从offset开始发送。如果指定了size,则会发送size个字节。注意必须满足(buffer length >= offset + size),否则会抛出异常。如果buffer过大(大约32MB+,取决与操作系统),则会抛出 ValueError异常。
  • recv_bytes([maxlength]以字符串的形式返回另一端发送的字节数据。如果没有可以接收的数据则阻塞。当另一端已经关闭并且已经没有可接收数据时,则抛出EOFError异常。如果指定了maxlength,并且maxlength小于待接收数据的长度,则抛出IOError异常,并且无法再使用该connection对象接收数据。
  • recv_bytes_into(buffer[offset])将待接收数据读入一个buffer中,并且返回读入数据的长度。如果没有可以接收的数据则阻塞。当另一端已经关闭并且已经没有可接收数据时,则抛出EOFError异常。buffer必须是一个可写的buffer接口。如果指定了offset,则数据从offset的位置开始写入buffer,offset必须是非负整数并且小于buffer的长度。如果buffer长度过小,则抛出BufferTooShort异常,此时,完整的待接收数据可以通过 e.args[0]获取,其中e是异常的实例。
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

  注意,当一个进程准备读写一个pipe时被杀死,此时pipe中的数据崩溃,因为无法获取该数据的边界位置。

 

3. multiprocessing实现的工具函数

(1)multiprocessing.active_children()

  返回当前进程的所有活的子进程列表(不包括自己)。调用这个函数的副作用是,会join那些已经结束的子进程。

(2)multiprocessing.cpu_count()

  返回系统中CPU的数量。可能引发 NotImplementedError异常

(3)multiprocessing.current_process()

  返回当前进程对象

 

推荐阅读