java - ZeroMQ PUB/SUB 拓扑在同一台机器上
问题描述
发布者是否可以使用 ZeroMQ 发布到同一台机器上的多个客户端?我想要一组客户端,每个客户端都可以使用 SocketType 进行标准的请求/响应调用。REQ和 SocketType。REP,但它也可以使用 SocketType 接收通知。SUB和 SocketType。酒吧。
尽管我的版本只有一个发布者,但我已尝试实现此拓扑,取自此处。
这是我的出版商:
public class ZMQServerSmall
{
public static void main(String[] args)
{
try (ZContext context = new ZContext())
{
ZMQ.Socket rep = context.createSocket(SocketType.REP);
rep.bind("tcp://*:5555");
ZMQ.Socket pub = context.createSocket(SocketType.PUB);
pub.bind("tcp://*:7777");
while (!Thread.currentThread().isInterrupted())
{
String req = rep.recvStr(0);
rep.send(req + " response");
pub.sendMore("Message header");
pub.send("Message body");;
}
}
}
}
这是我的代理(我包括了一个侦听器来尝试查看发生了什么):
public class ZMQForwarderSmall
{
public static void main(String[] args)
{
try
(
ZContext context = new ZContext();
)
{
ZMQ.Socket frontend = context.createSocket(SocketType.XSUB);
frontend.connect("tcp://*:7777");
ZMQ.Socket backend = context.createSocket(SocketType.XPUB);
backend.bind("tcp://*:6666");
IAttachedRunnable runnable = new Listener();
Socket listener = ZThread.fork(context, runnable);
ZMQ.proxy(frontend, backend, listener);
}
catch (Exception e)
{
System.err.println(e.getMessage());
}
}
private static class Listener implements IAttachedRunnable
{
@Override
public void run(Object[] args, ZContext ctx, Socket pipe)
{
while (true)
{
ZFrame frame = ZFrame.recvFrame(pipe);
if (frame == null)
break; // Interrupted
System.out.println(frame.toString());
frame.destroy();
}
}
}
}
这是我的订阅者:
public class ZMQClientSmall
{
public static void main(String[] args) throws IOException
{
String input;
try
(
ZContext context = new ZContext();
BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))
)
{
ZMQ.Socket reqSocket = context.createSocket(SocketType.REQ);
reqSocket.connect("tcp://localhost:5555");
ZMQ.Socket subSocket = context.createSocket(SocketType.SUB);
subSocket.connect("tcp://localhost:6666");
subSocket.subscribe("".getBytes(ZMQ.CHARSET));
while ((input = stdIn.readLine()) != null)
{
reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
String response = reqSocket.recvStr(0);
String address = subSocket.recvStr(ZMQ.DONTWAIT);
String contents = subSocket.recvStr(ZMQ.DONTWAIT);
System.out.println("Notification received: " + address + " : " + contents);
}
}
}
}
这是测试。我打开四个终端;1 个发布者、1 个代理和 2 个客户端。当我在两个客户端中的任何一个中发出请求时,我希望在两者中都看到通知,但我只在发出请求的终端中看到通知。我知道两个客户端都使用相同的地址(localhost:6666),但我希望代理能解决这个问题。
任何人都可以在这里看到明显的错误吗?
解决方案
问:发布者是否可以使用 ZeroMQ 发布到同一台机器上的多个客户端?
哦,当然,是的。对此毫无疑问。
检查代码。执行顺序的责任就在那里。在分布式系统中总是如此。
一旦[Client]-No1
实例得到一个似是而非的.readLine()
-ed input
,它就会跳入:
while ((input = stdIn.readLine()) != null)
{
reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
String response = reqSocket.recvStr(0);
String address = subSocket.recvStr(ZMQ.DONTWAIT);
String contents = subSocket.recvStr(ZMQ.DONTWAIT);
System.out.println( "Notification received: "
+ address + " : "
+ contents
);
}
接下来它.send()
结束REQ
并阻塞(等待REP
响应)
鉴于该[Client]-No2
实例还获得了一个合理的手动.readLine()
编辑input
,它会跳入相同的while(){...}
,但它不会继续进行,而是再次阻塞等待REP
响应。这不会在.recv()
任何时候得到 -ed,但在-No1
从REP
-side 得到服务之后,所以虽然-No1
可能已经摆脱了阻塞.recv()
- ,但不是这样-No2
( 它仍然会挂在它的阻塞.recv()
- 中,以进行任何下一个REP
-side 响应 (可能会来但不需要),而No1
已经进行到PUB/SUB-.recv()
,它将接收(但永远不会),下No2
一个冲入下一个阻塞input
-从.readLine()
等等,等等,等等,......,无限
因此,在任何数量的实例中,这些SEQ
循环中的 ( REQ
) 部分后跟 ( ) 部分,有效地生成了一个 EXCLUSIVE Tick-Tock-Tick-Tock 时钟机器,相互阻止了-以-interleaved 顺序编辑(不涉及手动、-驱动、阻塞步骤)SUB
N > 1
[Client]
PUB
N
.readLine()
ZMQServerSmall
不知道任何错误,因为它.send()
-s 是为了任何.recvStr()
-ed 交易对手,REQ/REP
而-sPUB
是所有交易对手(不自主读取,但只有在 ' 被手动.readLine()
解锁之后,然后才(在REQ/REP
偶发之后(可能无限阻塞)步骤)可能.recv()
是它的下一个(到目前为止还没有读取消息部分(但是,我没有看到任何代码可以显式处理SUB
-side.recv()
操作上的多部分标志的存在/不存在)
while (!Thread.currentThread().isInterrupted())
{
String req = rep.recvStr(0);
rep.send(req + " response");
pub.sendMore("Message header");
pub.send("Message body");;
}
同时ZMQServerSmall
在 -broadcast 通道上发送 ( N - 1 ) 倍的消息PUB
,因此 Tick-Tock-Tick-Tock MUTEX REQ/SUB
-loop-blocking "pendulum" 不是 2-State,而是N
接收方的 -State (都收到相同的PUB
-ed 消息流,
但由MUTEX - stepping的 N 步交错REQ/REP
)
推荐阅读
- javascript - 外部服务器上的 Cookie
- python - 创建一个 css 选择器以单次定位多个 id
- python - pd.read_sql_query 单/双引号格式化
- java - 使用 RxJava 在循环中链接异步方法
- html - 根据孩子的大小自动调整父母的高度(弹性元素)
- go - 在 Go 中,接口是否仅由 struct 数据类型实现?
- amazon-web-services - 通过空运行 API 调用验证 AWS IAM 策略
- c# - EWS:试图找到唯一的文件夹 ID
- android - 如何在android中根据谷歌地图不同的缩放级别调整谷歌地图标记的大小
- javascript - Bad rendering for borders of circles on a game in Canvas