首页 > 技术文章 > socket_server源码剖析、python作用域、IO多路复用

evilliu 2016-07-14 17:24 原文

本节内容:

课前准备知识:

函数嵌套函数的使用方法:

我们在使用函数嵌套函数的时候,是学习装饰器的时候,出现过,由一个函数返回值是一个函数体情况。

我们在使用函数嵌套函数的时候,最好也这么写。

1 def test():
2     name=2
3     def test1():
4         print(name)
5     return test1
6 
7 
8 test()()

通过返回值调用嵌套函数。或者你想执行函数test1的函数体,也可以如下操作但是不建议这么做。

1 def test():
2     name=2
3     def test1():
4         print(name)
5     test1()
6 
7 test()

一:作用域:

    a)在python中无块级作用域。在JavaScript 和python一样,没有块作用域,在C#和java中有块级作用域,变量只能在一个代码块中生效。

1 for i in range(9):
2     name=i
3 print(name)
4 8

 

1 if True:
2     name='ok'
3 print(name)
4 ok

 

b)python有函数作用域。外部不能调用函数内的变量。
1 def test():
2     name=2
3 print(name)##name拿不到变量name的值。

 

即时函数执行也拿不到
1 def test():
2     name=2
3 test()
4 print(name)

c)作用域链:函数从自己的作用域由内往外(上)找直至找到变量,如果没有会报错。

 

 1 name=1
 2 def test():
 3     name=2
 4     def test1():
 5         name=3
 6         print(name)
 7     return test1
 8 
 9 test()()
10 3 
 1 name=1
 2 def test():
 3     name=2
 4     def test1():
 5         print(name)
 6     return test1
 7 
 8 
 9 test()()
10 2
1 name=1
2 def test():
3     def test1():
4         print(name)
5     return test1
6 
7 
8 test()()
9 1

如上3个例子,说明函数是从自己的函数域找相应的变量,如果没有就外层函数找,外层函数没有,会去上层找,直至找到为止。

调用内部函数的变量由作用域1 到作用域2 到作用域3的顺序查找。

d)python在执行前作用域已经确定了,当解释器从上到下对代码的加载的时候,作用域已经确定了。所以在别的函数调用其他函数的时候,作用域需要去原先的函数作用域中找。

 

 1 def test():
 2     name='tom'
 3     print(name)
 4 
 5 
 6 def  func():
 7     name='evil'
 8     test()
 9 
10 
11 func()
12 tom

e)列表的推到式。

列表生成:

 

1 li=[x for x  in range(6) if x >2]##由后面的for循环if选值生成列表。for前面是生成什么样的列表。
2 print(li)
3 [3, 4, 5]
li=[lambda :x for x in range(5)]
易错点:函数没被调用,只会生成提一些列函数,而变量x是形参。而不是实参。
根据列表的推导式,生成没有被执行的lambda表达式,也就是函数。也就是说li里的对象是函数对象。由于只有调用函数的时候才能执行函数体有。所以li[0]()在执行的时候,由于变量x在函数里内没有赋值,会在函数体外面找,外面x的值已经为4.
1 li=[lambda :x for x in range(5)]
2 print(li)
3 ret=li[1]()
4 print(ret)
5 
6 [<function <listcomp>.<lambda> at 0x0000000000A91510>, <function <listcomp>.<lambda> at 0x0000000000A91730>, <function <listcomp>.<lambda> at 0x0000000000A917B8>, <function <listcomp>.<lambda> at 0x0000000000A91840>, <function <listcomp>.<lambda> at 0x0000000000A918C8>]
7 4

 


1 li=[]
2 for x in range(5):
3     def f1():
4         return x
5     li.append(f1)
6 
7 print(li[0]())
8 print(li[1]())
9 print(li[2]())
 如上代码在解释器执行的时候,从上到下,到函数f1 的时候, 没有执行return x的操作,所以生成列表是带有形参的x的函数对象。
本质上就是是否执行。
 1 li=[]
 2 for x in range(5):
 3     def f1(x=x):
 4         return x
 5     li.append(f1)
 6 
 7 print(li[0]())
 8 print(li[1]())
 9 print(li[2]())
10 0
11 1
12 2
解释器从上到下,加载的时候,遇到函数的时候,只是把函数名字以及函数参数执行(f1(x=x)),函数体并不执行。
二:类在2.5的多继承。

说明:F 是继承D、E,然后D继承B E继承C B和C继承A的关系。
在python2中,如果最顶层的类(A)不继承(object)那么子类F找自己没有的方法是从自己到左边一直到A 然后在右边到C
在python3中所有的类默认继承object,如果在python2中最顶层的类中继承object的情况下的话,在子类找相应的方法的顺序和3中是一样的。
三:IO多路复用:
用socketserver写的程序,是由事件驱动程序,事件是:接收和发送请求。
传统编程和事件驱动程序的对比:

开始--->代码块A--->代码块B--->代码块C--->代码块D--->......--->结束


每一个代码块里是完成各种各样事情的代码,但编程者知道代码块A,B,C,D...的执行顺序,唯一能够改变这个流程的是数据。输入不同的数据,根据条件语句判断,流程或许就改为A--->C--->E...--->结束。每一次程序运行顺序或许都不同,但它的控制流程是由输入数据和你编写的程序决定的。如果你知道这个程序当前的运行状态(包括输入数据和程序本身),那你就知道接下来甚至一直到结束它的运行流程。


 对于事件驱动型程序模型,它的流程大致如下:


开始--->初始化--->等待


 与上面传统编程模式不同,事件驱动程序在启动之后,就在那等待,等待什么呢? 等待被事件触发。传统编程下也有“等待”的时候,比如在代码块D中,你定义了一个input(),需要用户输入数据。但这与下面的等待不同,传统编程的 “等待”,比如input(),你作为程序编写者是知道或者强制用户输入某个东西的,或许是数字,或许是文件名称,如果用户输入错误,你还需要提醒他,并 请他重新输入。事件驱动程序的等待则是完全不知道,也不强制用户输入或者干什么。只要某一事件发生,那程序就会做出相应的“反应”。这些事件包括:输入信 息、鼠标、敲击键盘上某个键还有系统内部定时器触发。

 如上图所示,当客户端请求发送到服务端,服务端通过IO多路复用来监控服务端的socket的变化,来接收和发送请求,而server运行咱们自定义的handle方法来执行咱们的代码,并无限死循环执行下去,来抓取客户端的请求和发送给客户端请求。

  四:IO多路复用(input\outp):

IO操作:我理解是:可以是一次磁盘的读写,也可以是一次请求处理。

IO多路复用实现:

通过select 、poll 、epoll来实现。

那在socketserver是如何通过IO多路复用来实现,不停的抓取客户请求以及收发客户端消息呢?

IO多路复用通过监听socketserver服务器的socket对象内部是否发生变化来实现多并发处理客户端请求。

那在什么情况下,socket对象发生变化呢?

1:服务端socket对象创建新的连接(创建新的socket对象来和客户端进行通信。)

2:和客户端socket进行通信的socket对象收发消息。

也就是说,服务端socket对象发生一次IO请求的时候,就发生一次变化。通过这个变化来实现多并发处理客户端请求。

通过select模式来实现简单的多并发请求:

服务器端:

 1 import socket
 2 import select
 3 BUFF_SIZE=1024
 4 IP_PORT=('0.0.0.0',9999)
 5 SO=socket.socket()
 6 SO.bind(IP_PORT)
 7 SO.listen(5)
 8 while True:
 9     rlist,w,e=select.select([SO,],[],[],1)
10     print(rlist)
rlist,w,e=select.select([SO,],[],[],1) 
通过select模块,给select函数传入参数,传入参数为3个列表,最后数字1,表示超时时间,表示select监听socket的变化超时为1秒,第一个形参添加是服务器socket的SO的变化,该函数的返回值是三个分别被rlist,w,e分别接收,而且返回值对象为一个列表。
当客户端没有请求的时候,rlist的列表为[],当有返回值的时候,rlist列表为socket对象的列表。类似为:[<socket.socket fd=224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 9999)>]
客户端:
1 import  socket
2 S=socket.socket()
3 S.connect(('127.0.0.1',9999))

简单的发送消息,看下是否有阻塞问题,如果没有select的话  socket服务器在同一时间只能接收一个请求。

服务端:

 1 import socket
 2 import select
 3 BUFF_SIZE=1024
 4 IP_PORT=('0.0.0.0',9999)
 5 SO=socket.socket()
 6 SO.bind(IP_PORT)
 7 SO.listen(5)
 8 while True:
 9     rlist,w,e=select.select([SO,],[],[],1)
10     print(len(rlist))
11     for i in rlist:
12         conn,addr=i.accept()
13         conn.sendall(bytes('OK',encoding='utf-8'))

客户端:

1 import  socket
2 S=socket.socket()
3 S.connect(('127.0.0.1',9999))
4 while True:
5     data=S.recv(1024)
6     print(data)
7     S=input('>')#用input等待用户输入来保持连接存在。
8     S.close() 

效果:

实验一:

服务端:

客户端:

 

由以上实验,可以看到通过select模式监听客户端请求的时候,客户端的请求并没有出现阻塞。而服务端的rlist的列表,当监听的socket发生的变化的时候,rlist的长度为1,如果建立好连接的时候,rlist的列表的长度为0

如果服务端没有建立连接是什么情况的呢?
实验二:

由上图的所做的实验一和实验二,可以得到如下结论:
1:可以通过IO多路复用select模式,可以并发接收客户端请求。

2:在select模式的发现客户端请求的时候,如果服务端没去建立连接select的rlist列表,会一直存在这个变化的socket。
3:服务端和客户端创建连接的时候,rlist列表会自动去除该已经建立连接的socket对象。开始监听下一个客户端的请求。 

 服务端的for循环一直在循环,只有当rlist的列表不为空的时候,才执行for循环的循环体。

上面的例子只是监控服务端的socket的,而客户端的socket的并没有加入监听。下面把和客户端的通信的socket加入监控中。

服务端:

 1 import socket
 2 import select
 3 BUFF_SIZE=1024
 4 IP_PORT=('0.0.0.0',9999)
 5 SO=socket.socket()
 6 SO.bind(IP_PORT)
 7 SO.listen(5)
 8 
 9 NEW_LIST=[SO]
10 while True:
11     rlist,w,e=select.select(NEW_LIST,[],[],1)
12     print(len(rlist),len(NEW_LIST))
13     for i in rlist:
14         if i==SO:##如果是新的连接的话,建立连接。
15             conn,addr=i.accept()
16             conn.sendall(bytes('OK',encoding='utf-8'))
17             NEW_LIST.append(conn)##把跟客户端的建立连接的socket添加到监听的cocket的列表中。
18         else:#处理客户端的socket发生变化的进行接收消息和发送消息。
19             i.recv(1024)
20             i.sendall(bytes('back',encoding='utf-8'))

 

客户端:

1 import  socket
2 S=socket.socket()
3 S.connect(('127.0.0.1',9999))
4 while True:
5     data=S.recv(1024)
6     print(data)
7     inp=input('>')
8     S.sendall(bytes(inp,encoding='utf-8'))

注意上面的服务端 我们是把建议连接之后,把服务端的跟客户端通信的conn append到列表中。
在select模式中,如果服务端的socket发生变化的话 ,会把这个socket对象加入到rlist列表中,如果有服务端对该socket做出了相应的响应(IO操作)rlist会把该socket对象删除掉。
如果服务器没有对变化的socket做出回应,rlist会一直有该socket对象!!!

如下所示:

服务端:

客户端:

服务端在建立连接之后,收发一次消息之后,客户端在发一次消息的时候,服务端没有响应。所以rlist一直有这个socket对象。也就是是告诉服务器端有一次(IO)请求未处理。

如上:通过for循环来依次处理客户端的请求,其实是一个伪并发。实际socketserver内部也是这么处理请求。

如果客户端断开连接,由于监听的NEW_LIST是我们程序进行append,在断开连接的时候,需要把相应跟客户端进行通信的socket进行remove。

服务端:

 

 1 import socket
 2 import select
 3 BUFF_SIZE=1024
 4 IP_PORT=('0.0.0.0',9999)
 5 SO=socket.socket()
 6 SO.bind(IP_PORT)
 7 SO.listen(5)
 8 
 9 NEW_LIST=[SO]
10 while True:
11     rlist,w,e=select.select(NEW_LIST,[],[],1)
12     print(len(rlist),len(NEW_LIST))
13     for i in rlist:
14         if i==SO:##如果是新的连接的话,建立连接。
15             conn,addr=i.accept()
16             conn.sendall(bytes('OK',encoding='utf-8'))
17             NEW_LIST.append(conn)##把跟客户端的建立连接的socket添加到监听的cocket的列表中。
18         else:#处理客户端的socket发生变化的进行接收消息和发送消息。
19             try:
20                 data=i.recv(1024)
21                 i.sendall(bytes('back',encoding='utf-8'))
22             except Exception :
23                 NEW_LIST.remove(i)##断开连接我们需要把对应的socket对象从我们监控列表中删除掉。
24                 continue 

 在服务器断开连接的时候,默认客户端会发送一个空(这个没测出来,有待商榷。测试的时候并没有发送一个空)

 

 1 import socket
 2 import select
 3 BUFF_SIZE=1024
 4 IP_PORT=('0.0.0.0',9999)
 5 SO=socket.socket()
 6 SO.bind(IP_PORT)
 7 SO.listen(5)
 8 
 9 NEW_LIST=[SO]
10 while True:
11     rlist,w,e=select.select(NEW_LIST,[],[],1)
12     print(len(rlist),len(NEW_LIST))
13     for i in rlist:
14         if i==SO:##如果是新的连接的话,建立连接。
15             conn,addr=i.accept()
16             conn.sendall(bytes('OK',encoding='utf-8'))
17             NEW_LIST.append(conn)##把跟客户端的建立连接的socket添加到监听的cocket的列表中。
18         else:#处理客户端的socket发生变化的进行接收消息和发送消息。
19             try:
20                 data=i.recv(1024)
21                 if  not data:
22                     print('oop')
23                     raise Exception ('connect is over!')##断开并没有执行该语句。
24                 i.sendall(bytes('back',encoding='utf-8'))
25             except Exception as e:
26                 print(e)
27                 NEW_LIST.remove(i)
28                 continue

实现读写分离:

服务端:

 1 import socket
 2 import select
 3 BUFF_SIZE=1024
 4 IP_PORT=('0.0.0.0',9999)
 5 SO=socket.socket()
 6 SO.bind(IP_PORT)
 7 SO.listen(5)
 8 
 9 INPUTS=[SO]
10 OUPUTS=[]
11 while True:
12     rlist,wlist,e=select.select(INPUTS,OUPUTS,[],1)
13     #wlist列表表示谁给我发送过消息,outputs有值,wlist就有值。
14     print(len(rlist),len(INPUTS),len(OUPUTS),len(wlist))
15     for i in rlist:
16         if i==SO:##如果是新的连接的话,建立连接。
17             conn,addr=i.accept()
18             INPUTS.append(conn)##把跟客户端的建立连接的socket添加到监听的cocket的列表中。
19         else:#处理客户端的socket发生变化的进行接收消息和发送消息。
20             try:
21                 data=i.recv(1024)
22                 if  not data:
23                     raise Exception ('connect is over!')##断开并没有执行该语句。
24                 else:
25                     OUPUTS.append(i)#把给服务端发送过消息的加入这个列表。
26             except Exception as e:
27                 print(e)
28                 INPUTS.remove(i)
29                 continue
30     for w in wlist:
31         w.sendall(bytes('send again',encoding='utf-8'))
32         OUPUTS.remove(w)#发送完消息,需要把该socket对象移除,否则会无限发送消息。

 客户端:

1 import  socket
2 S=socket.socket()
3 S.connect(('127.0.0.1',9999))
4 while True:
5     inp=input('>')
6     S.sendall(bytes(inp,encoding='utf-8'))
7     data=S.recv(1024)
8     print(data)
9 S.close()

 鉴于上次客户端发的消息内容,回复客户端相同的内容。

 1 import socket
 2 import select
 3 BUFF_SIZE=1024
 4 IP_PORT=('0.0.0.0',9999)
 5 SO=socket.socket()
 6 SO.bind(IP_PORT)
 7 SO.listen(5)
 8 MSG={}#接收消息的字典。
 9 INPUTS=[SO]
10 OUPUTS=[]
11 while True:
12     rlist,wlist,e=select.select(INPUTS,OUPUTS,[],1)
13     #wlist列表表示谁给我发送过消息,outputs有值,wlist就有值。
14     print(len(rlist),len(INPUTS),len(OUPUTS),len(wlist))
15     for i in rlist:
16         if i==SO:##如果是新的连接的话,建立连接。
17             conn,addr=i.accept()
18             INPUTS.append(conn)##把跟客户端的建立连接的socket添加到监听的cocket的列表中。
19         else:#处理客户端的socket发生变化的进行接收消息和发送消息。
20             try:
21                 data=i.recv(1024)
22                 if  not data:
23                     raise Exception ('connect is over!')##断开并没有执行该语句。
24                 else:
25                     OUPUTS.append(i)#把给服务端发送过消息的加入这个列表。
26                     MSG[i]=[]
27                     MSG[i].append(data)##把客户端发送的消息添加到MSG字典当中。
28             except Exception as e:
29                 print(e)
30                 INPUTS.remove(i)
31                 del MSG[i]##在断开连接的时候,需要该socket的所有消息去掉。
32                 continue
33     for w in wlist:
34         msg=MSG[w].pop()
35         send_msg=msg+bytes('send again',encoding='utf-8')
36         w.sendall(send_msg)
37         OUPUTS.remove(w)#发送完消息,需要把该socket对象移除,否则会无限发送消息。 
1 import  socket
2 S=socket.socket()
3 S.connect(('127.0.0.1',9999))
4 while True:
5     inp=input('>')
6     S.sendall(bytes(inp,encoding='utf-8'))
7     data=S.recv(1024)
8     print(data)
9 S.close()

 epoll、select、poll:三种都是系统底层检测socket变化。

select是Windows和linux都有的一种IO多路复用的模式。

select:

1:多平台使用。

2:内部维护的一个for循环来监听服务器端socket的变化,效率很低

3:select监听socket对象有个数的限制(1024个。)

poll:

1:内部还是for循环实现监听服务端socket的变化。

2:监听的对象个数没有限制。

epoll:

内部不在改变使用for循环使用。而是通过客户端的socket发生变化,主动告知epoll发生变化,效率更高。nginx就是采用这种模式处理IO。

这三种模式,可以检测不仅仅是检测socket对象,可以检测所有的IO操作(不支持文件操作),比如输入终端(键盘、crt等)、

 

五:socketserver源码剖析:

  

 

                            5-1

socketserver工作原理:

1:创建socket,通过IO多路复用(select)来监听服务器端socket的变化(新连接的建立、建立socket和客户端进行通信)。

2:创建多个线程处理客户端请求,在处理的过程执行我们定义的handle方法。

如上是一个请求的连接的进入和处理整个socketserver的处理流程。

代码:

 1 import  socketserver
 2 class Myclass(socketserver.BaseRequestHandler):
 3     def handle(self):
 4         pass
 5     
 6     
 7     
 8     
 9 if __name__=='__main__':
10     server=socketserver.ThreadingTCPServer(('0.0.0.0',9999),Myclass)
11     server.serve_forever()

 首先来看第10行,第10行初始化ThreadingTCPServer一个对象,传入2个参数('0.0.0.0',9999)、Myclass。

首先需要执行ThreadingTCPServer的__init__()构造方法,由图5-1可以看到,只有TCPServer和BaseServer有构造方法,如下:

TCPServer构造方法:

由上可以看出在执行TCPServer的同时,执行他的父类BaseServer的__init__()构造方法。传入RequestHandlerClass类,这里RequestHandlerClass=Myclass(我们自定义的类。)

如下 是BaseServer的__init__()构造方法:

可以看出BaseServer执行构造方法,赋值了4个普通字段。其中self.RequestHandlerClass=RequestHandlerClass=Myclass

BaseServer构造方法执行完毕,执行TCPServer的构造方法:

1:首先创建一个socket。

2:执行server_bind()方法。

3:执行server_activeate()方法。

然后看下server_bing()方法执行了什么?

绑定server_address,而server_adress是我们程序中:

IP和PORT的元组,也就是说该步是服务端绑定监听端口。

执行server_activeate()方法,执行了什么呢?

创建客户端连接可排队等待连接池。

如上是初始化ThreadingTCPServer一个对象所执行的操作。

也就说:该步执行了,服务端socket的建立。

代码的10行执行完。

代码:

代码执行了什么呢?

由5-1图所示,只有类:BaseServer有方法server_forerver()。

由上图可以看见server_forerver()中执行了,IO多路复用(select)来对服务端的socket进行监听“变化”。

 

如果返回的read值为真执行self._hand_request_noblock()方法。由5-1图可以看出只有BaseServer有这个方法。如下:

如上所示,该方法执行self.get_request()方法,由5-1所示只有TCPServer有该方法。

有上所示,该方法执行accpet方法来接收客户端的请求,并赋值给request,client_address所以贯穿整个程序的self.request是服务端跟客户端进行通信的套接字。

也就是说上面操作已经建立好连接、并监控服务端的socket的变化。等待客户端的请求连接。

 接下来交给多线程并发处理我们的请求。

我们来看下方法:process_request方法执行了什么?

由5-1图可以看到:

只有类:ThreadingMixIn有process_request方法。如下:

如上所示:该方法创建线程,并执行方法:process_request_thread

我们来一起看下该方法执行了什么。该方法也在ThreadingMixIn类中。

该方法执行了finish_request方法。由5-1图可以看到:BaseServer类中有该方法:

如上所示执行构造RequestHandlerClass这个类对象。由上面我们推导我们知道:

 self.RequestHandlerClass=RequestHandlerClass=Myclass

也就是说执行我们定义的类Myclass的构造方法,由于我们定义的没有构造方法,那执行父类BaseRequestHandler方法.

由上可以看出来,该构造方法执行了self.setup以及 self.handle方法,由于我们没有定义self.setup,所以只执行:self.handle

而我们定义的类中有该方法,也就是执行我们自定义的handle方法。

由上可以得出:

建立完连接,socketserver创建多线程,并执行我们自定义的handle方法。

也就是socketserver说整个过程包括:

1:建立好连接、并监控服务端的socket的变化。等待客户端的请求连接。

2:建立完连接,socketserver创建多线程,并执行我们自定义的handle方法。

如上是socketserver的源码剖析~!

五:多线程:

在python中程序处理包括:

1:一个应用程序可以是多线程、多进程。

2:默认是单进程、单线程。

3:单进程多线程:IO操作不占用cpu。使用多线程提高并发。

4:计算性操作,占用CPU,用多进程提高并发。

在python中GIL(全局解释器锁),只能在同一时间只能有一个线程跟cpu交互。在java和C#中没有这个限制,可以单进程多个线程进行cpu交互计算。

多线程举例:

 1 #!/usr/bin/env python
 2 import time
 3 def f1(arg, t=None):
 4     if t:
 5         t._delete()
 6     time.sleep(5)
 7     print(arg)
 8 
 9 
10 # for i in range(10):
11 #     f1(i)
12 # 单进程、单线程的应用程序
13 import threading
14 t1 = threading.Thread(target=f1, args=(1,))
15 #t.setDaemon(True) # true,表示主线程不等此子线程,注意这个是线程设置,需要start之前定义
16 t1.start()# 不代表当前线程会被立即执行
17 #t.join(2)          # 表示主线程到此,等待 ... 直到子线程执行完毕,
18                    # 参数,表示主线程在此最多等待n秒
19 
20 t2 = threading.Thread(target=f1, args=(2,t1))
21 t2.start()# 不代表当前线程会被立即执行
22 print('end')
23 print('end')
24 print('end')
25 print('end')
26 print('end')

 t1 = threading.Thread(target=f1, args=(1,))中target表示做什么操作,args表示传入什么参数,如果没有参数的话,需要传入一个空元组。

单独join(超时间)使用的时候,不和setDaemon使用的话,会等待超时之后,执行下面的代码,配合使用是等待超时之后执行下面代码就结束主线程,不等子线程执行完。。

 

   

推荐阅读