首页 > 解决方案 > 如何将 Python ZMQ 链接到标准 UDP 或 TCP 服务器(旧版,使用 C++ 编写 ~ 15 年前)

问题描述

我需要在用 Python 编写的包(PsychoPy,用于心理/行为实验)和可以使用 UDP 或 TCP 的旧版 C++ 软件之间进行通信。特别是,在 Python / PsychoPy 方面,我需要一个像 Matlab 的 pnet() 这样的异步进程,它将轮询套接字以查看它是否有要读取的数据,如果有则处理数据,或者如果没有则继续。

ZMQ是推荐给我的;但是我看到的所有使用 zmq_polling 的示例代码都假设发送和接收都是使用 ZMQ 协议进行的。是否有一些简单的 Python ZMQ 代码连接到非 zmq TCP 或 UDP 源,并且如果没有数据要读取,是否会轮询以检查数据是否存在而不会挂断?

谢谢阿尼鲁达

import zmq
import time


# Prepare our context and sockets
context = zmq.Context()

# Bind socket to local host
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://129.236.162.112:55513")
#print( "Connected to server with port %s" % port_push)

# Initialize poll set
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)



# Process messages from socket
while True:
    print('Entered the queue')
    try:
        socks = dict(poller.poll())
    except KeyboardInterrupt:
        break

    if receiver in socks:
        message = receiver.recv()
        # process task
        print(repr(message))


    else:
        print('Nothing to show')

    time.sleep(0.01)

我可以从传统的 C++ 机器发送小的 TCP 数据包;他们在没有任何错误消息的情况下被发送出去,这意味着没有问题。但是这段 Python 代码没有任何反应

上面的代码进入了'try'并停留在那里。

如何访问错误/状态消息以进行调试?

谢谢阿尼鲁达

标签: pythonzeromqpyzmq

解决方案


欢迎来到零之禅: 是的,这是可能的

你的帖子一次问了很多问题。让我们从一个到另一个。


Q1如何访问错误/状态消息进行调试?

ZeroMQ 文档为此提供了工具。C 端绑定的共同点是接收显式返回代码,可以通过以下方式进行检查,assert()此外还可以从以下位置检索更多详细信息errno

void *ctx    = zmq_ctx_new();                     assert( ctx     && "EXC: Context failed to instantiate" );
void *socket = zmq_socket( ctx, ZMQ_STREAM );     assert( socket  && "EXC: Socket failed to instantiate" );
int   rc     = zmq_bind( socket, "tcp://*:8080" );assert( rc == 0 && "EXC: Bind failed to setup a Transport-Class" );

Q2+3 :是否有一些简单的 Python ZMQ 代码连接到非 zmq TCP 或 UDP 源(2),并进行轮询(3)以检查数据是否存在,如果没有数据可读取,则不会挂断?

为此目的(2),ZeroMQ 框架在 3.2+ 版左右配备了 STREAM Scalable Formal Communication Pattern Archetype。如果不确定 ZeroMQ 架构如何使用 Context、Context 的 Socket(s) 的 Archetypes、Socket 的 Transport-Class AccessPoint(s),在深入了解更多细节之前,您可能喜欢简短阅读“ZeroMQ原则在不到 5 秒内关于 ZeroMQ

使用传输时,类型的套接字ZMQ_STREAM用于从非 ØMQ 对等方发送和接收 TCP 数据tcp://。套接字可以ZMQ_STREAM充当客户端和/或服务器,异步发送和/或接收 TCP 数据。

接收 TCP 数据时,ZMQ_STREAM套接字应在将消息传递给应用程序之前,在消息前面添加一个包含原始对等方身份的消息部分。收到的消息在所有连接的对等点之间公平排队。

发送 TCP 数据时,ZMQ_STREAM套接字应删除消息的第一部分并使用它来确定消息应路由到的对等方的身份,不可路由的消息将导致错误EHOSTUNREACHEAGAIN错误。

要打开与服务器的连接,请使用zmq_connect调用,然后使用ZMQ_IDENTITY zmq_getsockopt调用获取套接字标识。

要关闭特定连接,请发送身份帧,后跟零长度消息(请参阅示例部分)。

建立连接后,应用程序将收到一条长度为零的消息。同样,当对端断开连接(或连接丢失)时,应用程序将收到一条长度为零的消息。

您必须先发送一个身份帧,然后再发送一个数据帧。该ZMQ_SNDMORE标志对于标识帧是必需的,但在数据帧上被忽略。

轮询(3)的使用有两个前提:永远不要使用任何方法的阻塞模式.recv()。ZeroMQ 有标志告诉方法不要阻塞:zmq.NOBLOCK在 python 端。另外,围绕非阻塞形式设计python代码.poll()或使用.Poller()-instance。

例子:

import zmq;                           print( zmq.zmq_version() ) # self-identify
aContext = zmq.Context();             print( "Context()", " instantiated." if zmq.zmq_errno() == 0 else " failed [#{}]".format( zmq.strerror( zmq.zmq_errno() ) ) )

aXmitSOCKET = aContext.socket( zmq.PUSH   ); aXmitSOCKET.setsockopt( zmq.LINGER, 0 ); ...
aCtrlSOCKET = aContext.socket( zmq.STREAM ); aCtrlSOCKET.setsockopt( zmq.LINGER, 0 ); ...

while True:
      if ( 0 == aXmitSOCKET.poll(  200, zmq.POLLIN ) ): # ~ 200 msec WAIT
         # ---------------------------------------------[aXmitPORT].hasNoIncomingMSG
         aCountDownREG -= 1                             #.DEC CDOWN as XmitPORT has no incoming DataToPREDICT_MSG
         aCountUpREG   += 1                             #.INC CNTUP
         if ( 0 == aCtrlSOCKET.poll( 1, zmq.POLLIN ) ): # ~   1 msec WAIT
            # ---------------------------------------------[aCtrlPORT].hasNoIncomingMSG
            ...
         else:                                          #
            # ---------------------------------------------[aCtrlPORT].hasAnIncomingMSG
            idF,aCMD = aCtrlSOCKET.recv_multipar( zmq.NOBLOCK )  # .recv()<-MSG as CtrlPORT has an incoming COMMAND_MSG
            ...
#--------------
# finally:
_ = [ aCtrlSOCKET.send_multipart( [ anIdentityFRAME, "" ], zmq.NOBLOCK ) for anIdentityFRAME in aListOfIdFRAMEs ]
aCtrlSOCKET.close()
aXmitSOCKET.close()
#--------------
# always:
aContext.term()

随意检查方法的实时文档:

>>> print( aCtrlSOCKET.recv_multipart.__doc__ )
Receive a multipart message as a list of bytes or Frame objects

        Parameters
        ----------
        flags : int, optional
            Any valid flags for :func:`Socket.recv`.
        copy : bool, optional
            Should the message frame(s) be received in a copying or non-copying manner?
            If False a Frame object is returned for each part, if True a copy of
            the bytes is made for each frame.
        track : bool, optional
            Should the message frame(s) be tracked for notification that ZMQ has
            finished with it? (ignored if copy=True)

        Returns
        -------
        msg_parts : list
            A list of frames in the multipart message; either Frames or bytes,
            depending on `copy`.

        Raises
        ------
        ZMQError
            for any of the reasons :func:`~Socket.recv` might fail

>>> print( aCtrlSOCKET.send_multipart.__doc__ )
Send a sequence of buffers as a multipart message.

        The zmq.SNDMORE flag is added to all msg parts before the last.

        Parameters
        ----------
        msg_parts : iterable
            A sequence of objects to send as a multipart message. Each element
            can be any sendable object (Frame, bytes, buffer-providers)
        flags : int, optional
            Any valid flags for :func:`Socket.send`.
            SNDMORE is added automatically for frames before the last.
        copy : bool, optional
            Should the frame(s) be sent in a copying or non-copying manner.
            If copy=False, frames smaller than self.copy_threshold bytes
            will be copied anyway.
        track : bool, optional
            Should the frame(s) be tracked for notification that ZMQ has
            finished with it (ignored if copy=True).

        Returns
        -------
        None : if copy or not track
        MessageTracker : if track and not copy
            a MessageTracker object, whose `pending` property will
            be True until the last send is completed.

推荐阅读