一:上下文管理:
对于一些对象在使用之后,需要关闭操作的。比如说:socket、mysql数据库连接、文件句柄等。
都可以用上下文来管理。
语法结构:
1 Typical usage: 2 3 @contextmanager 4 def some_generator(<arguments>): 5 <setup> 6 try: 7 yield <value> 8 finally: 9 <cleanup> 10 11 This makes this: 12 13 with some_generator(<arguments>) as <variable>: 14 <body>
code:
1 import socket 2 import contextlib 3 4 5 @contextlib.contextmanager 6 def sock_server(host,port): 7 sk=socket.socket() 8 sk.bind((host,port)) 9 sk.listen(4) 10 try: 11 yield sk 12 finally: 13 sk.close() 14 15 with sock_server("127.0.0.1",22) as soc: 16 print(soc)
执行顺序:
解释:python从上到下依次解释:
1、当到with的时候,执行with内socket_server("127.0.0.1",22),跳到
2、被contextlib.contextmanager装饰的函数。
3、依次执行函数socket_server到yield 并把sk返回给第4步的sco变量
4、然后执行with下面的代码块,执行print语句。
5、当with语句的代码块执行完。跳到第3步的yeild。
6、执行finally语句里的代码块。
二:线程池(threadpool)
自己版本:
1 #!/bin/env python 2 #author:evil_liu 3 #date:2016-7-21 4 #description: thread pool 5 6 import threading 7 import time 8 import queue 9 10 class Thread_Poll: 11 ''' 12 功能:该类主要实现多线程,以及线程复用。 13 ''' 14 def __init__(self,task_num,max_size): 15 ''' 16 功能:该函数是初始化线程池对象。 17 :param task_num: 任务数量。 18 :param max_size: 线程数量。 19 :return:无。 20 ''' 21 self.task_num=task_num 22 self.max_size=max_size 23 self.q=queue.Queue(task_num)#设置任务队列的。 24 self.thread_list=[] 25 self.res_q=queue.Queue()#设置结果队列。 26 27 def run(self,func,i,call_back=None): 28 ''' 29 功能:该函数是线程池运行主函数。 30 :param func: 传入任务主函数。 31 :param *args: 任务函数参数,需要是元组形式。 32 :param call_back: 回调函数。 33 :return: 无。 34 ''' 35 if len(self.thread_list)<self.max_size:#如果目前线程数小于我们定义的线程的个数,进行创建。 36 self.creat_thread() 37 misson=(func,i,call_back)#往任务队列放任务。 38 self.q.put(misson) 39 40 def creat_thread(self): 41 ''' 42 功能:该函数主要是创建线程,并调用call方法。 43 :return: 无。 44 ''' 45 t=threading.Thread(target=self.call)#创建线程 46 t.start() 47 48 def call(self): 49 ''' 50 功能:该函数是线程循环执行任务函数。 51 :return: 无。 52 ''' 53 cur_thread=threading.currentThread 54 self.thread_list.append(cur_thread) 55 event=self.q.get() 56 while True: 57 func,args,cal_ba=event#获取任务函数。 58 try: 59 res=func(*args)#执行任务函数。注意参数形式是元组形式。 60 flag="OK" 61 except Exception as e: 62 print(e) 63 res=False 64 flag="fail" 65 self.res(res,flag)#调用回调函数,将执行结果返回到队列中。 66 try: 67 event=self.q.get(timeout=2)#如果任务队列为空,获取任务超时2s超过2s线程停止执行任务,并退出。 68 except Exception: 69 self.thread_list.remove(cur_thread) 70 break 71 def res(self,res,status): 72 ''' 73 功能:该方法主要是将执行结果方法队列中。 74 :param res: 任务函数的执行结果。 75 :param status: 执行任务函数的结果,成功还是失败。 76 :return: 无。 77 ''' 78 da_res=(res,status) 79 self.res_q.put(da_res) 80 81 def task(x,y): 82 ''' 83 功能:该函数主要需要执行函数。 84 :param x: 参数。 85 :return: 返回值1,表示执行成功。 86 ''' 87 print(x) 88 return x+y 89 def wri_fil(x): 90 ''' 91 功能:该函数主要讲结果队列中的结果写入文件中。 92 :param x: 任务长度。 93 :return: 无。 94 ''' 95 while True:#将执行结果,从队列中获取结果并将结果写入文件中。 96 time.sleep(1) 97 if pool.res_q.qsize()==x:#当队列当前的长度等于任务执行次数,表示任务执行完成。 98 with open('1.txt','w') as f1: 99 for i in range(pool.res_q.qsize()): 100 try: 101 data=pool.res_q.get(timeout=2) 102 f1.write('mission result:%s,status:%s\n'%data) 103 except Exception: 104 break 105 break 106 else: 107 continue 108 if __name__ == '__main__': 109 pool=Thread_Poll(10,5)#初始化线程池对象。 110 for i in range(10):#循环任务。 111 pool.run(task,(1,2)) 112 wri_fil(10)
老师版本:注意老师在创建线程的时候,如果此时任务队列中没有任务的时候,不会创建其他线程。在线程执行完任务之后,将线程加入空闲线程的列表中,然后让当前线程去队列里获取任务,利用queue里的get()方法阻塞的作用的,如果一直阻塞的话,
然后表示空闲的列表中的加入的线程 一直有,此时表示创建线程数已经满足任务需求,如果不阻塞则空闲线程列表里没有空余线程。而是获取任务,执行任务。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import queue 5 import threading 6 import contextlib 7 import time 8 9 StopEvent = object() 10 11 12 class ThreadPool(object): 13 14 def __init__(self, max_num, max_task_num = None): 15 if max_task_num: 16 self.q = queue.Queue(max_task_num) 17 else: 18 self.q = queue.Queue() 19 self.max_num = max_num 20 self.cancel = False 21 self.terminal = False 22 self.generate_list = [] 23 self.free_list = [] 24 25 def run(self, func, args, callback=None): 26 """ 27 线程池执行一个任务 28 :param func: 任务函数 29 :param args: 任务函数所需参数 30 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 31 :return: 如果线程池已经终止,则返回True否则None 32 """ 33 if self.cancel: 34 return 35 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 36 self.generate_thread() 37 w = (func, args, callback,) 38 self.q.put(w) 39 40 def generate_thread(self): 41 """ 42 创建一个线程 43 """ 44 t = threading.Thread(target=self.call) 45 t.start() 46 47 def call(self): 48 """ 49 循环去获取任务函数并执行任务函数 50 """ 51 current_thread = threading.currentThread() 52 self.generate_list.append(current_thread) 53 54 event = self.q.get() 55 while event != StopEvent: 56 57 func, arguments, callback = event 58 try: 59 result = func(*arguments) 60 success = True 61 except Exception as e: 62 success = False 63 result = None 64 65 if callback is not None: 66 try: 67 callback(success, result) 68 except Exception as e: 69 pass 70 71 with self.worker_state(self.free_list, current_thread): 72 if self.terminal: 73 event = StopEvent 74 else: 75 event = self.q.get() 76 else: 77 78 self.generate_list.remove(current_thread) 79 80 def close(self): 81 """ 82 执行完所有的任务后,所有线程停止 83 """ 84 self.cancel = True 85 full_size = len(self.generate_list) 86 while full_size: 87 self.q.put(StopEvent) 88 full_size -= 1 89 90 def terminate(self): 91 """ 92 无论是否还有任务,终止线程 93 """ 94 self.terminal = True 95 96 while self.generate_list: 97 self.q.put(StopEvent) 98 99 self.q.queue.clear() 100 101 @contextlib.contextmanager 102 def worker_state(self, state_list, worker_thread): 103 """ 104 用于记录线程中正在等待的线程数 105 """ 106 state_list.append(worker_thread) 107 try: 108 yield 109 finally: 110 state_list.remove(worker_thread) 111 112 113 114 # How to use 115 116 117 pool = ThreadPool(5) 118 119 def callback(status, result): 120 # status, execute action status 121 # result, execute action return value 122 pass 123 124 125 def action(i): 126 print(i) 127 128 for i in range(30): 129 ret = pool.run(action, (i,), callback) 130 131 time.sleep(5) 132 print(len(pool.generate_list), len(pool.free_list)) 133 print(len(pool.generate_list), len(pool.free_list)) 134 # pool.close() 135 # pool.terminate()