首页 > 解决方案 > Python - 使用选择器的非阻塞套接字

问题描述

简而言之,我的问题:我不知道选择器如何知道哪个套接字应该首先读取或写入。

它是一个可以处理多个连接的服务器,它的流程应该是:

  1. 服务器创建监听套接字
  2. 客户端创建 2 个套接字并将它们连接到服务器
  3. 客户端 2 套接字发送消息
  4. 服务器 2 套接字回显这些消息,客户端和服务器关闭连接

这就是发生的情况,但是如果创建的服务器套接字将首先写入,则连接将立即关闭或抛出异常(?),因为它甚至不调用发送并且客户端套接字将什么也不接收。那么选择器如何知道哪些套接字应该首先准备好写入/读取呢?我错过了哪些信息来理解这一点?

服务器:

import socket
import selectors
import types

host = "127.0.0.1"
port = 63210

def accept_wrapper(sock):
    conn, addr = sock.accept()
    print('accepted connection from', addr)
    conn.setblocking(False)
    data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'')
    events = selectors.EVENT_READ | selectors.EVENT_WRITE
    sel.register(conn, events, data=data)

def service_connection(key, mask):
    sock = key.fileobj
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)
        if recv_data:
            data.outb += recv_data
        else:
            print('closing connection to', data.addr)
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if data.outb:
            print('echoing', repr(data.outb), 'to', data.addr)
            sent = sock.send(data.outb)
            data.outb = data.outb[sent:]

sel = selectors.DefaultSelector()

lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((host, port))
lsock.listen()
print('listening on', (host, port))
lsock.setblocking(False)
sel.register(lsock, selectors.EVENT_READ, data=None)

while True:
    events = sel.select(timeout=None)
    for key, mask in events:
        if key.data is None:
            accept_wrapper(key.fileobj)
        else:
            service_connection(key, mask)

客户:

import socket
import selectors
import types

host = "127.0.0.1"
port = 63210
num_conns = 2
messages = [b'Message 1 from client.', b'Message 2 from client.']

def start_connections(host, port, num_conns):
    server_addr = (host, port)
    for i in range(0, num_conns):
        connid = i + 1
        print('starting connection', connid, 'to', server_addr)
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(False)
        sock.connect_ex(server_addr)
        events = selectors.EVENT_READ | selectors.EVENT_WRITE
        data = types.SimpleNamespace(connid=connid,
                                     msg_total=sum(len(m) for m in messages),
                                     recv_total=0,
                                     messages=list(messages),
                                     outb=b'')
        sel.register(sock, events, data=data)

def service_connection(key, mask):
    sock = key.fileobj
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)
        if recv_data:
            print('received', repr(recv_data), 'from connection', data.connid)
            data.recv_total += len(recv_data)
        if not recv_data or data.recv_total == data.msg_total:
            print('closing connection', data.connid)
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if not data.outb and data.messages:
            data.outb = data.messages.pop(0)
        if data.outb:
            print('sending', repr(data.outb), 'to connection', data.connid)
            sent = sock.send(data.outb)
            data.outb = data.outb[sent:]

sel = selectors.DefaultSelector()
start_connections(host, port, num_conns)

while True:
    events = sel.select(timeout=None)
    for key, mask in events:
        service_connection(key, mask)

标签: pythonsocketsselect

解决方案


套接字实际上并不直接写入对等方,也不会从对等方读取。相反,它们写入本地套接字特定的写入缓冲区并从套接字特定的读取缓冲区读取。操作系统内核关心数据从套接字写入缓冲区到对等点的传递,并将从对等点接收到的数据包放入套接字接收缓冲区。

这些内核套接字缓冲区的状态和对这些缓冲区的更改可以使用诸如selectpoll、之类的函数进行监视kqueue。本质上:如果套接字写入缓冲区中有空间,则认为套接字是可写的。如果套接字读取缓冲区中有数据,则认为套接字是可读的。


推荐阅读