python-3.x - Python udp socket 发送阻塞时间过长
问题描述
我目前正在为一个大学学科实习,我需要使用两个 UDP 套接字来发送和接收来自计算机的视频数据。
为此,我创建了以下名为 UDPConnection 的类。它基本上使用两个 UDP 套接字,一个用于接收数据,一个用于发送数据,以及两个线程。第一个读取从第一个 UDP 套接字接收到的所有数据并将其存储在缓冲区中,以便我可以随时获取它。第二个从输出缓冲区读取数据并将其写入发送套接字,这样我只需要将数据推送到输出套接字即可避免可能的阻塞。
两个线程都使用事件来了解它们是否必须停止/恢复。
我遇到的问题是,有时,发送和接收的数据之间存在很大的延迟(计算机距离为 20 厘米时甚至超过 10 秒)。当这种延迟发生时,输出缓冲区开始填满,而几乎没有收到任何包。看起来发送线程无法以足够的速度将存储在输出缓冲区中的数据推送到网络中,因为它甚至会出现超时异常,但是由于我使用的是 UDP 套接字,所以我不知道为什么会这样正在发生。如果不能这样做,套接字是否不应该推送数据或丢弃数据?
也许我的代码上还有另一个我没有注意到的错误?
我检查了所有事件,以确保没有任何线程被暂停或其他什么,但它们工作正常。
这是我的代码(线程使用的函数是_sendData和_rcvData):
class UDPConnection(object):
def __init__(self, orIP, dstIp, orPort, dstPort, logger, maxTimeout = 2, rcvDataLen = 65535):
super(UDPConnection, self).__init__()
self.buffer = []
self.outputBuffer = queue.Queue(maxsize=120)
self.orIP = orIP
self.dstIp = dstIp
self.orPort = orPort
self.dstPort = dstPort
self.maxTimeout = maxTimeout
self.logger = logger
self.rcvDataLen = rcvDataLen
self.pausedEvent = Event()
self.closedEvent = Event()
self.rcvThread = None
self.sendThread = None
# Initialize the connection and the send/receive threads
def start(self):
# Evento que indica que la conexion debe estar cerrada
self.closedEvent.clear()
# Inicializamos para que no se detengan los hilos
self.pausedEvent.set()
self.sendSocket = None
self.rcvSocket = None
# Receive socket - To receive information
try:
self.rcvSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.rcvSocket.bind((self.orIP, self.orPort))
self.rcvSocket.settimeout(self.maxTimeout)
except socket.error as e:
self.logger.exception('UDPConnection - Receive socket {}:{}.'.format(self.orIP, self.orPort))
return False
time.sleep(2)
self.logger.debug('UDPConnection - Escuchando en {}:{}'.format(self.orIP, self.orPort))
self.rcvThread = Thread(target=self._rcvData)
self.rcvThread.start()
self.sendThread = Thread(target=self._sendData)
self.sendThread.start()
# Send socket - To send information
try:
self.sendSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sendSocket.connect((self.dstIp, self.dstPort))
self.sendSocket.settimeout(self.maxTimeout)
except socket.error as e:
self.logger.exception('UDPConnection - Send socket {}:{}.'.format(self.dstIp, self.dstPort))
return False
self.logger.debug('UDPConnection - Enviando en {}:{}'.format(self.orIP, self.orPort))
return True
# Pauses the connection
def pause(self):
self.pausedEvent.clear()
# Resumes the connection
def resume(self):
self.pausedEvent.set()
# Closes the connection
def quit(self):
# In case it's paused, we prevent it from getting stuck
self.pausedEvent.set()
self.closedEvent.set()
time.sleep(self.maxTimeout)
self.sendSocket.close()
self.rcvSocket.close()
return True
def isClosed(self):
return self.closedEvent.is_set()
def isPaused(self):
return not self.pausedEvent.is_set()
def isRunning(self):
return self.pausedEvent.is_set() and not self.closedEvent.is_set()
# Auxiliar function that pushes an object into the reception buffer.
# Used for inherited classes
def _bufferPush(self, data):
self.buffer.append(data)
# Auxiliar function that gets an element from the reception buffer.
# Used for inherited classes
def _bufferPop(self):
r = self.buffer[0]
del self.buffer[0]
# Auxliar function that allows us to know the number of elements
# on the reception buffer.
def _bufferLen(self):
bufLen = len(self.buffer)
return bufLen
# Function that empties the output buffer
def _emptyOutputBuffer(self):
while True:
try:
self.outputBuffer.get(False)
except queue.Empty:
break
# Used to get data from the output socket and push it into the network.
# If the connection is paused, it removes all the data from the output buffer and waits till it gets resumed
def _sendData(self):
while not self.isClosed():
if self.isPaused():
self._emptyOutputBuffer()
self.pausedEvent.wait()
try:
data = self.outputBuffer.get(True, self.maxTimeout)
self.sendSocket.send(data)
except queue.Empty as e:
continue
except socket.error as e:
# Socket timeout, perdemos el frame.
print('Timeout while sending the frame')
self._emptyOutputBuffer()
continue
# Function used to get data from the network and push it into the received buffer.
def _rcvData(self):
while not self.isClosed():
# Comprobamos si la conexion esta o no parada
self.pausedEvent.wait()
try:
request = self.rcvSocket.recv(self.rcvDataLen)
self._bufferPush(request)
except socket.error as e:
continue
# Function used by the user to send data
def send(self, data):
if self.isRunning():
if not self.outputBuffer.full():
self.outputBuffer.put_nowait(data)
# Function used by the user to receive data
def rcv(self, timeout=5):
start = time.time()
bufLen = 0
while (time.time()-start) < timeout:
bufLen = self._bufferLen()
if bufLen != 0:
break
if bufLen == 0:
raise BlockingIOError('Timeout passed.')
return self._bufferPop()
然后,
有人可以帮我吗?
非常感谢!
解决方案
推荐阅读
- macos - 暗模式:NSColor.colorNamed 在某些方法中没有返回正确的颜色
- google-cloud-platform - 有什么方法可以使用 2 个不同的 json 服务密钥将文件从一个项目复制到另一个项目?
- javascript - 反应 setstate 采用以前的值
- node.js - 在猫鼬结果上使用 lodash cloneDeep() - 无法更新生成的 JSON
- php - SQL多表分组学生
- assembly - 在 LC-3 汇编中编写一个 LC-3 汇编器(只处理助记符 -> 操作码,而不是操作数)
- java - 并非所有数据都被读入一个双端双向链表,而是所有数据都被读入另外两个
- java - 用 SQL 连接的 JTable 在 eclipse 中看不到结果
- xaml - 如何更改基于另一个样式的属性模板中的控件模板的属性
- owl - 如何使用 owlapi 将两个 owl 文件(.ttl 格式)合并为一个新文件?