首页 > 解决方案 > Python Sockets - 连接了两个客户端套接字的服务器,在两个单独的线程上,重叠接收数据

问题描述

我有一个用 Python 编写的服务器,它为每个连接的客户端启动一个带有新客户端套接字的新线程。

s = socket.socket()  

s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  
s.bind((socket.gethostname(), server_port))  
s.listen(5)


while True:
    (clientsock, address) = s.accept()  

    # Instantiates a 'Client' object  
    user = client.client(clientsock, address)  

    # Set up client thread
    client_thread = threading.Thread(target=do_client_operations , args=(user,))

    client_thread.start()

在每个生成的线程中,服务器通过连接的客户端套接字进入自己的发送/接收循环,并处理来自客户端的消息。

我在服务器和客户端之间实现了一个简单的消息传递协议,以及一些自动消息来检查该特定套接字是否仍然连接。当客户端断开连接时,服务器会相应地更新其仍然连接的“列表”。

该功能似乎工作正常,但通常当客户端向服务器发送特定消息时,例如,客户端 1 询问“告诉我当前谁连接到服务器”,服务器将改为向客户端 2 发送响应,因为如果client2.recv()之前收到消息client1.recv()

做了一些研究,似乎套接字不是线程安全的,并且以这种方式编写程序并不是一个好的设计选择(我很久以前就开始了这个项目,当时我是 Python 的新手)

我似乎不明白为什么两个独立线程上的两个独立套接字从同一个网络缓冲区“接收”?不管线程如何,由同一进程创建的多个套接字是否执行此操作?

不幸的是,我已经走得太远了,重写这个逻辑将是一项艰巨的任务。我正在考虑向消息传递协议添加一些附加功能,其中客户端在消息中添加一些唯一 ID,以便服务器知道消息来自何处以及响应谁。

但我不得不想象有一些更简单的方法可以做到这一点。

有什么想法吗?(另外我希望这足够清楚,我不会在这里发布太多)

更新:更多代码

所以这里是每个新线程进入的主循环。user参数是创建的对象Client,其中包含s.accept()服务器主循环中返回的客户端套接字。

def main_client_loop(user):

    # Do not let socket timeout, wait on messages from client
    user.socket_object.settimeout(None)
    client_msg_list = user.receive_msg()
    s_globals.message_handler(client_msg_list, user)

这是服务器的接收功能

def receive_msg(self):
    logger.debug("In receive function")
    is_pickle = False

    # Receive initial data and extract message length
    data = self.socket_object.recv(s_globals.BUFFER)
    logger.debug("Received data: " + str(data.decode()))

    if len(data) is 0:
        logger.debug("Client terminated connection")
        self.is_connected = False
        s_globals.client_master_list.remove(self)
        quit()

    msg_list_out = []
    msg_list_in = data.decode().split('\0')
    for msg in msg_list_in:
        if len(msg) is 0:
            break
        else:
            # Otherwise add to outlist normally
            msg_list_out.append(msg[4:len(msg)])

    return msg_list_out

相对简单的代码,客户端套接字接收由空字节分隔的消息并将它们放入一个列表中,该列表提供给处理程序进行处理。此函数是Client对象的一部分,因此它作用于self.

def message_handler(server_messages, caller):
    logger.debug("In message_handler with caller: " + str(caller))
    """
    :type server_message: str
    :type call: client_class_user.client_user
    """
    handler_table= {
        SERVER_USERLIST_REQUEST: partial(send_active_user_list, caller),
        CLIENT_HELLO: caller.poll_response_good
    }

    for msg in server_messages:
        try:
            exec_response = handler_table[msg]
            logger.debug("Executing function for msg: " + str(msg) + " - "+ str(exec_response))
            exec_response()
        except Exception as e:
            logger.debug("Exception raised when executing function for: " + str(msg) + str(e))
            do_nothing()

以上是处理来自客户端的传入消息的代码的一部分。从我看到的一些日志数据(我会在可以重现时发布)中,消息处理程序进入函数时,caller参数是不同的套接字,而不是参数中的同一对象caller调用其自己的方法时记录的套接字receive_msg()

这就是我目前难住的地方。会随着我的进展而更新。

如果有人还在阅读,这里有一个错误似乎发生在哪里的日志打印输出。

1545428257.7718635: Received data from: ('CLIENT_1_ADDRESS', 49807) on socket: <socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('SERVER_ADDRESS', 6881), raddr=('CLIENT_1_ADDRESS', 49807)>
**************
0024server_userlist_request^@
1545428257.7718995: In message_handler with caller: ('CLIENT_1_ADDRESS', 49807)
1545428257.7719202: Executiing message for caller ('CLIENT_1_ADDRESS', 49807) @ <class_client_user.client_user object at 0x7f905a5c1ba8>
1545428257.7719324: Executing function for msg: server_userlist_request - functools.partial(<function send_active_user_list at 0x7f905af79378>, <class_client_user.client_user object at 0x7f905a5c1ba8>)
1545428257.7719395: Sending user list in pickled format to caller: ('CLIENT_1_ADDRESS', 49807)
1545428257.7719553: in send_msg with ('CLIENT_2_ADDRESS', 48266)
1545428257.7719624: Received bytes message to send: b'\x80\x03]q\x00(X\x05\x00\x00\x0028263q\x01X\x05\x00\x00\x0027528q\x02e.'
1545428257.7719736: Sending data: b'p0033\x80\x03]q\x00(X\x05\x00\x00\x0028263q\x01X\x05\x00\x00\x0027528q\x02e.\x00' to socket ('CLIENT_2_ADDRESS', 48266)
1545428258.1330225: in main_client_loop
1545428258.1330776: In receive function with ('CLIENT_2_ADDRESS', 48266)
1545428260.775301: in main_client_loop
1545428260.7753537: In receive function with ('CLIENT_1_ADDRESS', 49807)

客户端套接字 1 接收客户端 1 对用户列表的请求,将消息传递给处理程序,并将消息和客户端套接字 1 作为参数。消息处理程序使用客户端 1 套接字执行发送用户列表功能。

该函数基本上获取当前列表,将其腌制并调用传递的套接字对象的send_msg()方法。

但是,一旦send_msg()被调用,日志就会表明它实际上是 Client Socket 2,它将发送消息(带有用户列表)。所以客户端 1 请求列表,客户端 2 接收它。

那个尤里卡时刻还没有击中我,我很难理解为什么客户端套接字 2 突然控制接收到的消息和客户端套接字 1 调用的方法,因为这两个套接字存在于不同的线程上。

更新:已解决

def send_active_user_list(user):
    """
    :type user: class_client_user.client_user
    """
    logger.debug("Sending user list in pickled format to caller: " + str(user.ip_address))
    uid_list = []
    for user in client_master_list:
        if user.uid not in client_master_list:
            uid_list.append(user.uid)

    user.send_msg(pickle.dumps(uid_list))

我不敢相信我没有看到这个。由于一些荒谬(可能是懒惰)的原因,我决定user在我的迭代中使用相同的变量client_master_list(它包含所有登录的用户)作为我的迭代变量,它最终修改了我最初传递的用户参数,所以send_msg()user列表中最后一次迭代的对象。

这解释了为什么第一个日志语句正确写入原始参数,但send_msg()在最后一个用户登录服务器时执行。

这甚至不是线程问题,只是粗心。

这可以被任何人关闭

标签: pythonmultithreadingsockets

解决方案


推荐阅读