首页 > 解决方案 > Python Socket 接收/发送多线程

问题描述

我正在编写一个 Python 程序,在主线程中,我使用 recv 函数通过 TCP 套接字连续(循环)接收数据。在回调函数中,我使用 sendall 函数通过同一个套接字发送数据。触发回调的原因无关紧要。我已将套接字设置为阻塞。

我的问题是,这样做安全吗?我的理解是在单独的线程(不是主线程)上调用回调函数。Python 套接字对象是线程安全的吗?根据我的研究,我得到了相互矛盾的答案。

标签: pythonmultithreadingsocketsthread-safety

解决方案


Python 中的套接字不是线程安全的。

你试图一次解决几个问题:

  1. 套接字不是线程安全的。
  2. recv 正在阻塞并阻塞主线程。
  3. sendall 正在从不同的线程中使用。

您可以通过使用asyncio或通过 asyncio 在内部解决它的方式来解决这些问题:通过select.select与 a 一起使用socketpair,并为传入数据使用队列。

import select
import socket
import queue

# Any data received by this queue will be sent
send_queue = queue.Queue()

# Any data sent to ssock shows up on rsock
rsock, ssock = socket.socketpair()

main_socket = socket.socket()

# Create the connection with main_socket, fill this up with your code

# Your callback thread
def different_thread():
    # Put the data to send inside the queue
    send_queue.put(data)

    # Trigger the main thread by sending data to ssock which goes to rsock
    ssock.send(b"\x00")

# Run the callback thread

while True:
    # When either main_socket has data or rsock has data, select.select will return
    rlist, _, _ = select.select([main_socket, rsock], [], [])
    for ready_socket in rlist:
        if ready_socket is main_socket:
            data = main_socket.recv(1024)
            # Do stuff with data, fill this up with your code
        else:
            # Ready_socket is rsock
            rsock.recv(1)  # Dump the ready mark
            # Send the data.
            main_socket.sendall(send_queue.get())

我们在这里使用多个构造。您必须用您选择的代码填充空白处。至于解释:

我们首先创建一个send_queue要发送的数据队列。然后,我们创建一对连接的套接字 ( socketpair())。我们稍后需要这个来唤醒主线程,因为我们不希望recv()阻塞和阻止写入套接字。

然后,我们连接main_socket并启动回调线程。现在这是魔术:

在主线程中,我们select.select用来知道rsockormain_socket是否有任何数据。如果其中一个有数据,则主线程唤醒。

ssock将数据添加到队列后,我们通过发出信号唤醒主线程rsock,从而从select.select.

为了完全理解这一点,您必须阅读select.select()和。socketpair()queue.Queue()


@tobias.mcnulty 在评论中提出了一个很好的问题:为什么我们应该使用 aQueue而不是通过套接字发送所有数据?

您也可以使用socketpair发送数据,这有其好处,但由于多种原因,通过队列发送可能更可取:

  1. 通过套接字发送数据是一项昂贵的操作。它需要一个系统调用,需要在系统缓冲区内来回传递数据,并且需要充分利用 TCP 堆栈。使用 aQueue保证,我们将只有 1 次调用 - 用于单字节信号 - 而不会更多(除了队列的内部锁,但那个非常便宜)。通过 发送大数据socketpair将导致多个系统调用。作为提示,您也可以使用collections.deque由于 GIL 而 CPython 保证是线程安全的。这样,除了socketpair.
  2. 在架构方面,使用队列可以让您在以后进行更细粒度的控制。例如,数据可以以您希望的任何类型发送,然后进行解码。这可以让主循环更智能一些,并且可以帮助您创建更简单的界面。
  3. 你没有大小限制。它可以是错误或功能。我相信不完全鼓励更改系统的缓冲区大小,这会自然限制您可以发送的数据量。这可能是一个好处,但应用程序可能希望自己控制它。使用“自然”特性会导致调用线程挂起。
  4. 就像socketpair.recv系统调用一样,对于大数据,您也将通过多次select调用。TCP 没有消息边界。您要么必须创建人造的,将套接字设置为非阻塞并处理异步套接字,要么将其视为流并不断传递select调用,这取决于您的操作系统可能很昂贵。
  5. 支持同一套接字对上的多个线程。从多个线程通过套接字发送 1 个字节来发送信号很好,这正是 asyncio 的工作原理。发送更多可能会导致数据以错误的顺序发送。

总而言之,在内核和用户空间之间来回传输数据是可能的并且会起作用,但我个人不建议这样做。


推荐阅读