首页 > 解决方案 > 在python中完成连接后,非阻塞套接字是否总是可写的?

问题描述

我想从头开始在 python 中构建一个端口映射工具。基本上,它是一个 TCP 代理,在客户端和目标服务或应用程序之间传输所有流量。

具体来说,对于每个连接,我创建两个套接字,它们分别负责与客户端和目标服务进行通信。为了实现 IO 多路复用功能,我使用模块selectors来监视这两个套接字上的EVENT_READ和事件。EVENT_WRITE但是我发现套接字始终是可写的,因此主循环根本不会阻塞。正常吗?我的代码如下:

import socket
import selectors

def recv_from(sock):
    data = b''
    try:
        while True:
            chunk = sock.recv(4096)
            if not chunk:
                break
            data += chunk
    except:
        pass
    return data

class RelayHandlder:

    def __init__(self, client_sock, remote_sock, selector):
        '''
        client_sock and remote_sock have already finished the connection.
        '''
        self._client_sock = client_sock
        self._remote_sock = remote_sock
        self._selector = selector

        self._send_buffer = b''
        self._recv_buffer = b''

        self._selector.register(self._client_sock, selectors.EVENT_READ|selectors.EVENT_WRITE, self._client_handler)
        self._selector.register(self._remote_sock, selectors.EVENT_READ|selectors.EVENT_WRITE, self._remote_handler)

    def _client_handler(self, client_sock, mask):
        if mask & selectors.EVENT_READ:
            data = recv_from(client_sock)
            if data:                
                self._send_buffer = data
            else:
                self._close()
        elif mask & selectors.EVENT_WRITE:
            if self._recv_buffer:
                try:
                    client_sock.send(self._recv_buffer)
                    self._recv_buffer = b''
                except OSError:
                    self._close()

    def _remote_handler(self, remote_sock, mask):
        if mask & selectors.EVENT_READ:
            data = recv_from(remote_sock)
            if data:
                self._recv_buffer = data
            else:
                self._close()
        elif mask & selectors.EVENT_WRITE:
            if self._send_buffer:
                try:
                    remote_sock.send(self._send_buffer)
                    self._send_buffer = b''
                except OSError:
                    self._close()

    def _close(self):
        print('Closing ...')
        self._selector.unregister(self._client_sock)
        self._client_sock.close()
        self._selector.unregister(self._remote_sock)
        self._remote_sock.close()
        self._send_buffer = b''
        self._recv_buffer = b''

class PortMapper:
    '''
    Map the remote port to local.
    '''

    def __init__(self, proxy_ip, proxy_port, remote_ip, remote_port):
        self.proxy_ip = proxy_ip
        self.proxy_port = proxy_port
        self.remote_ip = remote_ip
        self.remote_port = remote_port

        self._selector = selectors.DefaultSelector()

        self._proxy_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        while True:
            try:
                self._proxy_sock.bind((proxy_ip, proxy_port))
                break
            except OSError:
                proxy_port += 1
                self.proxy_port = proxy_port
        self._proxy_sock.listen(10)
        self._proxy_sock.setblocking(False)
        self._selector.register(self._proxy_sock, selectors.EVENT_READ, self._accept_handler)
        print('Listening at {}:{}'.format(proxy_ip, proxy_port))

    def _accept_handler(self, proxy_sock, mask):
        client_sock, addr = proxy_sock.accept()
        client_sock.setblocking(False)
        print('Accept from {}'.format(addr))

        remote_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        remote_sock.setblocking(False)
        try:
            remote_sock.connect((self.remote_ip, self.remote_port))
        except BlockingIOError:
            pass

        RelayHandlder(client_sock, remote_sock, self._selector)

    def loop(self):
        while True:
            events = self._selector.select()
            for key, mask in events:
                callback = key.data
                callback(key.fileobj, mask)

if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser(description='Port Map Tool.')
    parser.add_argument('-H', '--remote-host', required=True, type=str, help='Remote host.')
    parser.add_argument('-P', '--remote-port', required=True, type=int, help='Remote port.')
    parser.add_argument('-p', '--local-port', default=1000, type=int, help='Local port.')
    args = parser.parse_args()

    PortMapper('0.0.0.0', args.local_port, args.remote_host, args.remote_port).loop()

在代码中,self._send_bufferinRelayHandler用于缓存从客户端接收到的数据。如果self._remote_sock可写self._send_buffer且不为空,则代理将发送self._send_buffer到远程服务。逻辑类似self._client_sock。主循环是在loop函数中定义的PortMapper

我有两个问题:

标签: pythonsockets

解决方案


套接字通常是可写的,直到系统缓冲区已满。这就是为什么许多简单的select多路复用系统只考虑读取部分并假设它们将能够写入或接受如果它们不是被阻塞的可能性的原因。

如果你想超级安全并确保你能写,你应该忽略 EVENT_WRITE,除非你准备好写东西。但是为了防止您的代码耗尽本地内存缓冲区,如果其他通道无法写入,则中继应停止读取(也忽略 EVENT_READ)。


推荐阅读