首页 > 解决方案 > Python udp socket 发送阻塞时间过长

问题描述

我目前正在为一个大学学科实习,我需要使用两个 UDP 套接字来发送和接收来自计算机的视频数据。

为此,我创建了以下名为 UDPConnection 的类。它基本上使用两个 UDP 套接字,一个用于接收数据,一个用于发送数据,以及两个线程。第一个读取从第一个 UDP 套接字接收到的所有数据并将其存储在缓冲区中,以便我可以随时获取它。第二个从输出缓冲区读取数据并将其写入发送套接字,这样我只需要将数据推送到输出套接字即可避免可能的阻塞。

两个线程都使用事件来了解它们是否必须停止/恢复。

我遇到的问题是,有时,发送和接收的数据之间存在很大的延迟(计算机距离为 20 厘米时甚至超过 10 秒)。当这种延迟发生时,输出缓冲区开始填满,而几乎没有收到任何包。看起来发送线程无法以足够的速度将存储在输出缓冲区中的数据推送到网络中,因为它甚至会出现超时异常,但是由于我使用的是 UDP 套接字,所以我不知道为什么会这样正在发生。如果不能这样做,套接字是否不应该推送数据或丢弃数据?

也许我的代码上还有另一个我没有注意到的错误?

我检查了所有事件,以确保没有任何线程被暂停或其他什么,但它们工作正常。

这是我的代码(线程使用的函数是_sendData和_rcvData):

class UDPConnection(object):
    def __init__(self, orIP, dstIp, orPort, dstPort, logger, maxTimeout = 2, rcvDataLen = 65535):
        super(UDPConnection, self).__init__()
        self.buffer = []
        self.outputBuffer = queue.Queue(maxsize=120)
        self.orIP = orIP
        self.dstIp = dstIp
        self.orPort = orPort
        self.dstPort = dstPort
        self.maxTimeout = maxTimeout
        self.logger = logger
        self.rcvDataLen = rcvDataLen

        self.pausedEvent = Event()
        self.closedEvent = Event()
        self.rcvThread = None
        self.sendThread = None

    # Initialize the connection and the send/receive threads
    def start(self):
        # Evento que indica que la conexion debe estar cerrada
        self.closedEvent.clear()
        # Inicializamos para que no se detengan los hilos
        self.pausedEvent.set()

        self.sendSocket = None
        self.rcvSocket = None
        # Receive socket - To receive information
        try:
            self.rcvSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.rcvSocket.bind((self.orIP, self.orPort))
            self.rcvSocket.settimeout(self.maxTimeout)
        except socket.error as e:
            self.logger.exception('UDPConnection - Receive socket {}:{}.'.format(self.orIP, self.orPort))
            return False

        time.sleep(2)
        self.logger.debug('UDPConnection - Escuchando en {}:{}'.format(self.orIP, self.orPort))

        self.rcvThread = Thread(target=self._rcvData)
        self.rcvThread.start()

        self.sendThread = Thread(target=self._sendData)
        self.sendThread.start()


        # Send socket - To send information
        try:
            self.sendSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.sendSocket.connect((self.dstIp, self.dstPort))
            self.sendSocket.settimeout(self.maxTimeout)
        except socket.error as e:
            self.logger.exception('UDPConnection - Send socket {}:{}.'.format(self.dstIp, self.dstPort))
            return False

        self.logger.debug('UDPConnection - Enviando en {}:{}'.format(self.orIP, self.orPort))
        return True

    # Pauses the connection
    def pause(self):
        self.pausedEvent.clear()

    # Resumes the connection
    def resume(self):
        self.pausedEvent.set()

    # Closes the connection
    def quit(self):
        # In case it's paused, we prevent it from getting stuck
        self.pausedEvent.set()

        self.closedEvent.set()
        time.sleep(self.maxTimeout) 
        self.sendSocket.close()
        self.rcvSocket.close()
        return True


    def isClosed(self):
        return self.closedEvent.is_set()

    def isPaused(self):
        return not self.pausedEvent.is_set()

    def isRunning(self):
        return self.pausedEvent.is_set() and not self.closedEvent.is_set()

    # Auxiliar function that pushes an object into the reception buffer.
    # Used for inherited classes
    def _bufferPush(self, data):
        self.buffer.append(data)

    # Auxiliar function that gets an element from the reception buffer.
    # Used for inherited classes
    def _bufferPop(self):
        r = self.buffer[0]
        del self.buffer[0]

    # Auxliar function that allows us to know the number of elements 
    # on the reception buffer. 
    def _bufferLen(self):
        bufLen = len(self.buffer)
        return bufLen

    # Function that empties the output buffer
    def _emptyOutputBuffer(self):
        while True:
            try:
                self.outputBuffer.get(False)
            except queue.Empty:
                break

    # Used to get data from the output socket and push it into the network.
    # If the connection is paused, it removes all the data from the output buffer and waits till it gets resumed

    def _sendData(self):
        while not self.isClosed():
            if self.isPaused():
                self._emptyOutputBuffer()

            self.pausedEvent.wait()
            try:
                data = self.outputBuffer.get(True, self.maxTimeout)
                self.sendSocket.send(data)
            except queue.Empty as e:
                continue
            except socket.error as e:
                # Socket timeout, perdemos el frame.
                print('Timeout while sending the frame')
                self._emptyOutputBuffer()
                continue

    # Function used to get data from the network and push it into the received buffer.
    def _rcvData(self):
        while not self.isClosed():
            # Comprobamos si la conexion esta o no parada
            self.pausedEvent.wait()

            try:
                request = self.rcvSocket.recv(self.rcvDataLen) 
                self._bufferPush(request)
            except socket.error as e:
                continue

    # Function used by the user to send data
    def send(self, data):
        if self.isRunning():
            if not self.outputBuffer.full():
                self.outputBuffer.put_nowait(data)

    # Function used by the user to receive data
    def rcv(self, timeout=5):
        start = time.time()
        bufLen = 0
        while (time.time()-start) < timeout:
            bufLen = self._bufferLen()
            if bufLen != 0:
                break

        if bufLen == 0:
            raise BlockingIOError('Timeout passed.')

        return self._bufferPop()

然后,

有人可以帮我吗?

非常感谢!

标签: python-3.xmultithreadingsocketsudptimeoutexception

解决方案


推荐阅读