首页 > 技术文章 > 【Python之路】特别篇--Python线程池

5poi 2017-01-11 14:13 原文

  版本一:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading


class ThreadPool(object):

    def __init__(self, max_num=20):
        self.queue = Queue.Queue(max_num)
        for i in xrange(max_num):
            self.queue.put(threading.Thread)

    def get_thread(self):
        return self.queue.get()

    def add_thread(self):
        self.queue.put(threading.Thread)

"""
pool = ThreadPool(10)

def func(arg, p):
    print arg
    import time
    time.sleep(2)
    p.add_thread()


for i in xrange(30):
    thread = pool.get_thread()
    t = thread(target=func, args=(i, pool))
    t.start()
"""
版本一

  版本二: 

#!/usr/bin/env python
# -*-coding:utf-8 -*-


import threading
import time
import queue
import contextlib

StopEvent = object()


class Threading(object):

    def __init__(self,maxthread):
        # 任务队列
        self.q=queue.Queue()
        # 最大线程数
        self.MaxThread = maxthread
        # 空闲线程列表
        self.free_thread = []
        # 已经创建的线程
        self.generate_list = []
        # 中断执行标志位
        self.terminal = False

    def run(self,func,args,callback=None):
        w = (func,args,callback)
        self.q.put(w)
        if len(self.free_thread) == 0 and len(self.generate_list) < self.MaxThread:
            self.create_thread()

    def create_thread(self):
        t = threading.Thread(target=self.call)
        t.start()


    def call(self):
        current_thread = threading.current_thread()
        self.generate_list.append(current_thread)
        event = self.q.get()

        while event != StopEvent:
            status = True
            func, args, callback = event

            try :
                ret = func(args)
            except Exception as e:
                status = False
                ret = e
            if callback is not None:
                try:
                    callback(status,ret)
                except Exception as e:
                    pass
            else:
                pass

            if self.terminal:
                event = StopEvent
            else:
                with self.worker_state(self.free_thread,current_thread):
                    event = self.q.get()

        else:
            self.generate_list.remove(current_thread)

    def close(self):
        num = len(self.generate_list)
        while num :
            self.q.put(StopEvent)
            num -=1

    def terminate(self):
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.empty()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)

def action(i):
    time.sleep(0.5)
    print(i)


def callback(status , ret):
    print(status,ret)


if __name__ == '__main__':

    pool = Threading(10)

    for i in range(50):
        pool.run(action , i )

    # pool.terminate()
    pool.close()
版本二

 

推荐阅读