首页 > 解决方案 > Tornado 接收来自多播组的 UDP 数据包

问题描述

我有一台服务器,我想从多播组接收数据。是否有任何内置功能可用于接收此多播 UDP 数据包?

编辑:代码实现
我已经实现了代码,如下所示:

#!/usr/bin/env python

import socket
import struct
import os
import errno
import binascii
import tornado.ioloop
from tornado.ioloop import IOLoop
from tornado.platform.auto import set_close_exec

class UDPHandler():
    """ 
    Connect to multicast group 
    """
    def __init__(self, ip, port, io_loop):
        self.io_loop    = io_loop
        self._multiIP   = ip
        self.port       = port
        self._sock      = None
        self._socket    = {} # fd -> socket object

    def conn(self):
        """
        Listner to multicast group
        """
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self._sock.settimeout(3)
        self._sock.bind(('', self.port))
        self._sock.setblocking(0)
        group   = socket.inet_aton(self._multiIP)
        mreq    = struct.pack('4sL', group, socket.INADDR_ANY)
        self._sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
        self._socket[self._sock.fileno()] = self._sock
        print("self._sock:", self._sock)

    def onRx(self, data, addr):
        print("addr, data:", addr, len(str(data)))
        print(data)

    def r(self):
        self.conn()
        add_socket_handler(self._sock, self.onRx, self.io_loop)

def add_socket_handler(sock, callback, io_loop):
    def accept_handler(fd, events):
        while True:
            try:
                data, address = sock.recvfrom(1024)
            except socket.error as e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    callback(None, None)
            except Exception as e:
                print("except:", e)
                callback(None, None)

            callback(data, address)
    io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)

def periodic():
    # print("periodic")
    None
def main():
    MULTICAST_IP = "224.1.1.10"
    RECEIVE_PORT = 10003
    udpRx = UDPHandler(MULTICAST_IP, RECEIVE_PORT, tornado.ioloop.IOLoop.current())
    udpRx.r()
    tornado.ioloop.PeriodicCallback(periodic, 1000).start()
    tornado.ioloop.IOLoop.current().start()

if __name__ == "__main__":
    main()

现在的问题是即使我收到一个数据包,我也会在一个循环中收到相同的数据包,我一遍又一遍地接收相同的数据包。代码有问题吗?特别是与add_socket_handler

编辑 2:
我在 while 循环中添加了一个 break 语句,add_socket_handler现在它似乎运行良好。

def add_socket_handler(sock, callback, io_loop):
    def accept_handler(fd, events):
        while True:
            try:
                data, address = sock.recvfrom(1024)
                callback(data, address)
            except socket.error as e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    raise
            except Exception as e:
                raise
            break ## change in here
    io_loop.add_handler(sock.fileno(), accept_handler, io_loop.READ)

这是应该如何完成的吗?

标签: asynchronoustornadomulticast

解决方案


breakadd_socket_handler向后看。你想循环直到你得到 EWOULDBLOCK/EAGAIN。(按照所写的中断,它仍然可以工作,但效率会稍低,并且可能会丢失数据包)。

def add_socket_handler(sock, callback, io_loop):
    def read_handler(fd, events):
        while True:
            try:
                data, address = sock.recvfrom(1024)
                callback(data, address):
            except socket.error as e:
                if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN):
                    return
                raise
    io_loop.add_handler(sock, read_handler, io_loop.READ)

除此之外,这看起来是对的,尽管我自己没有使用多播 UDP。


推荐阅读