首页 > 解决方案 > ZeroMQ PUB/SUB 拓扑在同一台机器上

问题描述

发布者是否可以使用 ZeroMQ 发布到同一台机器上的多个客户端?我想要一组客户端,每个客户端都可以使用 SocketType 进行标准的请求/响应调用。REQ和 SocketType。REP,但它也可以使用 SocketType 接收通知。SUB和 SocketType。酒吧

尽管我的版本只有一个发布者,但我已尝试实现此拓扑,取自此处。

在此处输入图像描述

这是我的出版商:

public class ZMQServerSmall 
{   
    public static void main(String[] args)
    {
        try (ZContext context = new ZContext()) 
        {           
            ZMQ.Socket rep = context.createSocket(SocketType.REP);
            rep.bind("tcp://*:5555");

            ZMQ.Socket pub = context.createSocket(SocketType.PUB);
            pub.bind("tcp://*:7777");           

            while (!Thread.currentThread().isInterrupted()) 
            {                   
                String req = rep.recvStr(0);
                rep.send(req + " response");

                pub.sendMore("Message header");
                pub.send("Message body");;          
            }
        }
    }
}

这是我的代理(我包括了一个侦听器来尝试查看发生了什么):

public class ZMQForwarderSmall 
{
    public static void main(String[] args) 
    {       
        try 
        (
            ZContext context = new ZContext();   
        )
        {
            ZMQ.Socket frontend = context.createSocket(SocketType.XSUB);            
            frontend.connect("tcp://*:7777");

            ZMQ.Socket backend = context.createSocket(SocketType.XPUB);
            backend.bind("tcp://*:6666");

            IAttachedRunnable runnable = new Listener();
            Socket listener = ZThread.fork(context, runnable);

            ZMQ.proxy(frontend, backend, listener);
        }
        catch (Exception e) 
        {
            System.err.println(e.getMessage());
        } 
    }

    private static class Listener implements IAttachedRunnable 
    {    
        @Override
        public void run(Object[] args, ZContext ctx, Socket pipe) 
        {
            while (true) 
            {
                ZFrame frame = ZFrame.recvFrame(pipe);
                if (frame == null)
                    break; // Interrupted
                System.out.println(frame.toString());
                frame.destroy();
            }
        }
    }
}

这是我的订阅者:

public class ZMQClientSmall
{   
    public static void main(String[] args) throws IOException
    {
        String input;

        try 
        (
            ZContext context = new ZContext();
            BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))
        ) 
        { 
            ZMQ.Socket reqSocket = context.createSocket(SocketType.REQ);
            reqSocket.connect("tcp://localhost:5555");

            ZMQ.Socket subSocket = context.createSocket(SocketType.SUB);
            subSocket.connect("tcp://localhost:6666");

            subSocket.subscribe("".getBytes(ZMQ.CHARSET));

            while ((input = stdIn.readLine()) != null)
            {
                reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
                String response = reqSocket.recvStr(0);

                String address = subSocket.recvStr(ZMQ.DONTWAIT);
                String contents = subSocket.recvStr(ZMQ.DONTWAIT);
                System.out.println("Notification received: " + address + " : " + contents);
            }
        }   
    }   
}

这是测试。我打开四个终端;1 个发布者、1 个代理和 2 个客户端。当我在两个客户端中的任何一个中发出请求时,我希望在两者中都看到通知,但我只在发出请求的终端中看到通知。我知道两个客户端都使用相同的地址(localhost:6666),但我希望代理能解决这个问题。

任何人都可以在这里看到明显的错误吗?

标签: javazeromqdistributed-computingpublish-subscribedistributed-system

解决方案


发布者是否可以使用 ZeroMQ 发布到同一台机器上的多个客户端?

哦,当然,是的。对此毫无疑问。


检查代码。执行顺序的责任就在那里。在总是如此。

一旦[Client]-No1实例得到一个似是而非的.readLine()-ed input,它就会跳入:

        while ((input = stdIn.readLine()) != null)
        {
            reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
            String response = reqSocket.recvStr(0);

            String address = subSocket.recvStr(ZMQ.DONTWAIT);
            String contents = subSocket.recvStr(ZMQ.DONTWAIT);
            System.out.println(           "Notification received: "
                              + address + " : "
                              + contents
                                );
        }

接下来它.send()结束REQ并阻塞(等待REP响应)

鉴于该[Client]-No2实例还获得了一个合理的手动.readLine()编辑input,它会跳入相同的while(){...},但它不会继续进行,而是再次阻塞等待REP响应。这不会在.recv()任何时候得到 -ed,但在-No1REP-side 得到服务之后,所以虽然-No1可能已经摆脱了阻塞.recv()- ,但不是这样-No2( 它仍然会挂在它的阻塞.recv()- 中,以进行任何下一个REP-side 响应 (可能会来但不需要),而No1已经进行到PUB/SUB-.recv(),它将接收(但永远不会),下No2一个冲入下一个阻塞input-从.readLine() 等等,等等,等等,......,无限

因此,在任何数量的实例中,这些SEQ循环中的 ( REQ) 部分后跟 ( ) 部分,有效地生成了一个 EXCLUSIVE Tick-Tock-Tick-Tock 时钟机器,相互阻止了-以-interleaved 顺序编辑(不涉及手动、-驱动、阻塞步骤)SUBN > 1[Client]PUBN.readLine()

ZMQServerSmall不知道任何错误,因为它.send()-s 是为了任何.recvStr()-ed 交易对手,REQ/REP而-sPUB是所有交易对手(不自主读取,但只有在 ' 被手动.readLine()解锁之后,然后才(在REQ/REP偶发之后(可能无限阻塞)步骤)可能.recv()是它的下一个(到目前为止还没有读取消息部分(但是,我没有看到任何代码可以显式处理SUB-side.recv()操作上的多部分标志的存在/不存在)

        while (!Thread.currentThread().isInterrupted()) 
        {                   
            String req = rep.recvStr(0);
            rep.send(req + " response");

            pub.sendMore("Message header");
            pub.send("Message body");;          
        }

同时ZMQServerSmall在 -broadcast 通道上发送 ( N - 1 ) 倍的消息PUB,因此 Tick-Tock-Tick-Tock MUTEX REQ/SUB-loop-blocking "pendulum" 不是 2-State,而是N接收方的 -State (都收到相同的PUB-ed 消息流,
但由MUTEX - stepping的 N 步交错REQ/REP


推荐阅读