首页 > 技术文章 > socket及其相关(续篇)

yangyinghua 2016-01-03 20:07 原文

IO 多路复用

基本概念

IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用如下场合:

(1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。
(2)当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。
(3)如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。
(4)如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。
(5)如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。
 与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

总而言之,指通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。

windows python : 
	只支持 select 方法

mac python :
	只支持 select 方法
   
Linux python :
	支持 select poll epoll 三种方法

说了一大堆,估计大家也被整晕了,我们还是直接上示例吧~~

终端操作示例:

std.py

#f = file() , obj = socket(), sys.stdin = 终端输入
#select.select监听用户输入,如果用户输入内容,select 会感知 sys.sdtin 改变,将改变的文件句柄保存至列表,并将列表作为select第一个参数返回,如果用户未输入内容,select 第一个参数 = [],

import select
import threading
import sys

while True:
    readable, writeable, error = select.select([sys.stdin,],[],[],1)
    if sys.stdin in readable:
        print 'select get stdin',sys.stdin.readline()

执行结果:

go go go    #等待用户输入,用户输入: go go go 
select get stdin go go go 

ok          #用户输入 ok
select get stdin ok

socket操作示例

回顾之前socket参数之 -----> sk.setblocking(bool)

是否阻塞(默认True),<阻塞>,如果设置False,<不阻塞>,那么accept和recv时一旦无数据,则报错。

请看如下示例:

server.py

import socket
import time

sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001))
sk1.listen(5)
sk1.setblocking(False)   #这里设置False,<不阻塞>

while True:
    try:                 #获取异常,如果不做这一步,获取不到数据则报错
        conn,addr = sk1.accept()  
        conn.close()
        print addr
    except Exception,e:
        print e
        time.sleep(2)

在客户端执行,如果客户端没有请求(用浏览器输入:http://127.0.0.1:8001),则报错,如有输入,则获取到值,如下:

Errno 35] Resource temporarily unavailable  #没有客户端请求时,报错
[Errno 35] Resource temporarily unavailable
[Errno 35] Resource temporarily unavailable
[Errno 35] Resource temporarily unavailable
[('127.0.0.1', 62393)                       #获取到值
('127.0.0.1', 62394)
[Errno 35] Resource temporarily unavailable
('127.0.0.1', 62395)
select 示例1:

select.py

import socket
import time
import select 

sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001))
sk1.listen(5)
sk1.setblocking(False)

while True:
    #readable_list :如果有客户端连接,则有值,否则空列表  
    readable_list, writeable_list, error_list = select.select([sk1,],[],[],2)  #不是空列表,select感知,sk1获取到值
    for r in readable_list:    #不是空列表,循环执行
        conn,addr = r.accept()
        print addr

执行: python select.py
在浏览器输入: http://127.0.0.1:8001
执行结果:(服务器端打印日志)

('127.0.0.1', 62205)
('127.0.0.1', 62206)

接下来,再多监听一个端口(监听多端口):

import select
import socket

sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001))  #监听8001端口
sk1.listen(5)
sk1.setblocking(False)

#新监听8002端
sk2 = socket.socket()
sk2.bind(('127.0.0.1',8002))  #监听8002端口 -->新监听的端口
sk2.listen(5)
sk2.setblocking(False)

while True:
    readable_list, writeable_list, error_list = select.select([sk1,sk2],[],[],1) #新增:[sk1,sk2]
    for r in readable_list:
        conn,addr = r.accept()
        print addr

再次在浏览器器输入新监听的端口,你会发现原来服务端只支持处理一个客户端请求,如今可以支持处理多个客户端请求:

http://127.0.0.1:8002,执行结果如下:

('127.0.0.1', 62222)
('127.0.0.1', 62223)
select 示例2:

服务端:server.py

sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001))
sk1.listen(5)
sk1.setblocking(False)
inputs = [sk1,]   
#将上例中select.select([sk1,]...) 中[sk1,] 赋值给 inputs

#sk1 , conn 都是socket对象(文件描述符)


while True:
    readable_list, writeable_list, error_list = select.select(inputs,[],[],1)  #请看这里的修改
    time.sleep(2)              #因为执行处理速度太快,这里sleep 2s 
    print "inputs:",inputs     #打印inputs ,方便执行时观察变化
    print "res:",readable_list #打印readable_list ,方便执行时查看变化

    for r in readable_list:
        if r == sk1:  
        #判断是服务端还是客户端,如果是服务端才进行下面的操作,因为客户端没有accept方法
            conn,addr = r.accept()
            inputs.append(conn)
            print addr
        else:
        #如果是客户端,接受和返回数据
            client_data = r.recv(1024)
            r.sendall(client_data)

上面的server端详解如下:

#第一次请求进来(第一个客户端进来,只是连接,没有操作): readable_list = [sk1,], 第一次执行完后: inputs = [sk1,] ---> inputs = [sk1,conn1]
#第二次请求进来(第二个客户端进来,也只是连接,没有操作):readable_list = [sk1,] ,执行完后:inputs = [sk1,conn1,] ---> inputs = [sk1,conn1,conn2]
#如果第一个客户端发送一条数据,服务端的socket<sk1>不变,只是客户端的socket<即conn1>变化 :readable_list = [conn1,] , inputs = [sk1,conn1,conn2] 

#<conn1 应该是conn,这里conn1 代表第一个客户端进来>

----

#第一个参数,监听的句柄序列
#如果第二参数有参数,即只要不是空列表,select就能感知,然后writeabled_list就能获取值
#第三个参数监听描述符,监听是否出错,如果出错,则摘除
#第四个参数,阻塞时间,如 1秒(这个如果不写,select会阻塞住,直到监听的描述符发生变化才继续往下执行)
readable_list, writeable_list, error_list = select.select(inputs,[],[],1)  

客户端: client.py

import select
import socket

client = socket.socket()
client.connect(('127.0.0.1',8001))
client.settimeout(10)

while True:
    client_input =  raw_input('please input:').strip()
    client.sendall(client_input)
    server_data = client.recv(1024)
    print server_data

client.close()

服务端执行结果:

inputs: [<socket._socketobject object at 0x104cbbf30>]
res: []
inputs: [<socket._socketobject object at 0x104cbbf30>]
res: []

run_server

客户端连接之后,服务端日志(但是还没有输入)

inputs: [<socket._socketobject object at 0x104cbbf30>]
res: [<socket._socketobject object at 0x104cbbf30>]
('127.0.0.1', 62815)
inputs: [<socket._socketobject object at 0x104cbbf30>, <socket._socketobject object at 0x104cbbfa0>]
res: []

client1_connect

第二个客户端端连接:

inputs: [<socket._socketobject object at 0x104cbbf30>]
res: [<socket._socketobject object at 0x104cbbf30>]
('127.0.0.1', 62815)
inputs: [<socket._socketobject object at 0x104cbbf30>, <socket._socketobject object at 0x104cbbfa0>]
res: []

client2_connect

在客户端输入(其中一个client):

inputs: [<socket._socketobject object at 0x104cbbf30>, <socket._socketobject object at 0x104cbbfa0>, <socket._socketobject object at 0x104cfc050>]
res: []
inputs: [<socket._socketobject object at 0x104cbbf30>, <socket._socketobject object at 0x104cbbfa0>, <socket._socketobject object at 0x104cfc050>]
res: [<socket._socketobject object at 0x104cbbfa0>]
inputs: [<socket._socketobject object at 0x104cbbf30>, <socket._socketobject object at 0x104cbbfa0>, <socket._socketobject object at 0x104cfc050>]
res: []

注意:

我们知道,上面的例子,如客户端中途断开,在服务端连接并没有释放
对于上面的例子,做如下修改(下面的例子,客户端断开后,直接释放)

服务端修改如下

server.py

import select
import socket
import time

sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001))
sk1.listen(5)
sk1.setblocking(False)
inputs = [sk1,]
outputs = []   #这里修改

while True:
    readable_list, writeable_list, error_list = select.select(inputs,outputs,[],2)  #情况这里修改
    time.sleep(2)
    print "inputs:",inputs
    print "res:",readable_list
    print "wri",writeable_list
    for r in readable_list:
        if r == sk1:
            conn,addr = r.accept()

            inputs.append(conn)
            outputs.append(conn)   #append句柄
            print addr
        else:
            client_data = r.recv(1024)
            if client_data:
                r.sendall(client_data)
            else:
                inputs.remove(r)   #如果没有收到客户端端数据,则移除客户端句柄

执行结果如下:

连个客户端连接:

断开其中一个客户端

此时,发现断开的那个客户端被自动剔除


下面是讲解 readable_list, writeable_list 读写拆分的示例

import select
import socket
import time

sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001))
sk1.listen(5)
sk1.setblocking(False)
inputs = [sk1,]
outputs = []

while True:
    readable_list, writeable_list, error_list = select.select(inputs,outputs,[],1)
    #文件描述符可读 readable_list    只有变化的时候,感知
    #文件描述符可写 writeable_list   只要有,感知
    time.sleep(2)
    print "inputs:",inputs
    print "res:",readable_list
    print "wri",writeable_list
    for r in readable_list:
        if r == sk1:
            conn,addr = r.accept()
            inputs.append(conn)
            print addr
        else:
            client_data = r.recv(1024)
            #如果接受到数据,则将数据添加到outputs,writeable则能感知
            if client_data:
                outputs.append(r)

    for w in writeable_list:
    	 #如果列表有数据,则给客户端发送一条
        w.sendall('1234')
        #循环列表时,outputs获取文件句柄,只要outputs有客户端文件句柄,每一次列表循环都能获取数据,即第一次写完之后,不在给客户端发送数据
        outputs.remove(w)

Queue 队列

import Queue

q = Queue.Queue()
q.put(1)
q.put(2)
q.put(3)

print '第一个:',q.get()
print '第二个:',q.get()

执行结果:

import Queue

q = Queue.Queue()
q.put(1)                 #put数据到队列
q.put(2)
q.put(3)

print '第一个:',q.get()   #队列是先进先出,这里get了两次,则分别取出了1,2
print '第二个:',q.get()

如果队列没有数据,这时get数据,则会等待

import Queue

q = Queue.Queue()
q.get()

q.put(1)
q.put(2)
q.put(3)

上面的方式没有结果:(处于等待,没有输出)

下面的例子,如果没有数据,通过 get_nowait(),不会等待,但是会报错

import Queue

q = Queue.Queue()
q.get_nowait()
q.get()

q.put(1)
q.put(2)

结果如下:(报错)

/usr/bin/python /Users/yangallen214/PycharmProjects/ob_11/day10/que_demo.py
Traceback (most recent call last):
  File "/Users/yangallen214/PycharmProjects/ob_11/day10/que_demo.py", line 20, in <module>
    q.get_nowait()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/Queue.py", line 190, in get_nowait
    return self.get(False)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/Queue.py", line 165, in get
    raise Empty
Queue.Empty

通过try获取异常:

import Queue

q = Queue.Queue()
try:
    q.get_nowait()
    q.get()

    q.put(1)
    q.put(2)
    q.put(3)
except Queue.Empty:
    print "err"

上面的put 时也可以使用 q.put_nowait() 方法,这里不多说

readable_list, writeable_list 读写拆分的示例

import select
import socket
import Queue

sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001))
sk1.listen(5)
sk1.setblocking(False)
inputs = [sk1,]
outputs = []
message = {}

#message = {
#    'c1':队列,
#    'c2':队列,【b,bb,bbb】
#}


while True:
    readable_list, writeable_list, error_list = select.select(inputs,outputs,[],1)
    #文件描述符可读 readable_list    只有变化,感知
    #文件描述符可写 writeable_list   只要有,感知

    for r in readable_list:
        if r == sk1:
            conn,addr = r.accept()
            inputs.append(conn)
            message[conn] = Queue.Queue()
        else:
            client_data = r.recv(1024)
            if client_data:
                #获取数据
                outputs.append(r)
                #在指定队列中插入数据
                message[r].put(client_data)
            else:
                inputs.remove(r)
                #如果空队列则删除
                del message[r] 

    for w in writeable_list:
        #去指定队列取数据
        try:
            data = message[w].get_nowait()
            w.sendall(data)
        except Queue.Empty:
            pass
        outputs.remove(w)
        #如果空队列则删除
        #del message[w]   

select、多路复用 先讲这么多,下次继续更新...

更多连接: http://www.cnblogs.com/wupeiqi

推荐阅读