c++ - 跨多个程序实例的 ZeroMQ IPC
问题描述
我在程序的几个实例之间的 ZMQ 中的进程间通信存在一些问题
- 我正在使用 Linux 操作系统
- 我正在使用 zeromq/cppzmq,libzmq 的仅标头 C++ 绑定
如果我运行这个应用程序的两个实例(比如在终端上),我会为一个实例提供一个作为侦听器的参数,然后为另一个提供一个作为发送者的参数。侦听器永远不会收到消息。我试过 TCP 和 IPC 无济于事。
#include <zmq.hpp>
#include <string>
#include <iostream>
int ListenMessage();
int SendMessage(std::string str);
zmq::context_t global_zmq_context(1);
int main(int argc, char* argv[] ) {
std::string str = "Hello World";
if (atoi(argv[1]) == 0) ListenMessage();
else SendMessage(str);
zmq_ctx_destroy(& global_zmq_context);
return 0;
}
int SendMessage(std::string str) {
assert(global_zmq_context);
std::cout << "Sending \n";
zmq::socket_t publisher(global_zmq_context, ZMQ_PUB);
assert(publisher);
int linger = 0;
int rc = zmq_setsockopt(publisher, ZMQ_LINGER, &linger, sizeof(linger));
assert(rc==0);
rc = zmq_connect(publisher, "tcp://127.0.0.1:4506");
if (rc == -1) {
printf ("E: connect failed: %s\n", strerror (errno));
return -1;
}
zmq::message_t message(static_cast<const void*> (str.data()), str.size());
rc = publisher.send(message);
if (rc == -1) {
printf ("E: send failed: %s\n", strerror (errno));
return -1;
}
return 0;
}
int ListenMessage() {
assert(global_zmq_context);
std::cout << "Listening \n";
zmq::socket_t subscriber(global_zmq_context, ZMQ_SUB);
assert(subscriber);
int rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
assert(rc==0);
int linger = 0;
rc = zmq_setsockopt(subscriber, ZMQ_LINGER, &linger, sizeof(linger));
assert(rc==0);
rc = zmq_bind(subscriber, "tcp://127.0.0.1:4506");
if (rc == -1) {
printf ("E: bind failed: %s\n", strerror (errno));
return -1;
}
std::vector<zmq::pollitem_t> p = {{subscriber, 0, ZMQ_POLLIN, 0}};
while (true) {
zmq::message_t rx_msg;
// when timeout (the third argument here) is -1,
// then block until ready to receive
std::cout << "Still Listening before poll \n";
zmq::poll(p.data(), 1, -1);
std::cout << "Found an item \n"; // not reaching
if (p[0].revents & ZMQ_POLLIN) {
// received something on the first (only) socket
subscriber.recv(&rx_msg);
std::string rx_str;
rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size());
std::cout << "Received: " << rx_str << std::endl;
}
}
return 0;
}
如果我使用两个线程运行程序的一个实例,则此代码将起作用
std::thread t_sub(ListenMessage);
sleep(1); // Slow joiner in ZMQ PUB/SUB pattern
std::thread t_pub(SendMessage str);
t_pub.join();
t_sub.join();
但我想知道为什么在运行程序的两个实例时上面的代码不起作用?
谢谢你的帮助!
解决方案
如果您从未使用过 ZeroMQ,
您可能会喜欢先看看“ZeroMQ原则在不到五秒内”
,然后再深入了解更多细节
问:想知道为什么在运行程序的两个实例时上面的代码不起作用?
这段代码永远不会飞thread
- 它与基于 - 或process
基于- 的[CONCURENT]
处理无关。
它是由内部进程通信的错误设计引起的。
ZeroMQ 可以提供任何一种受支持的传输类:{ ipc:// | tipc:// | tcp:// | norm:// | pgm:// | epgm:// | vmci:// }
加上为进程内通信提供更智能的inproc://
传输类,为线程间通信准备好传输类,其中无堆栈通信可能享有最低的延迟,只是一个内存映射策略。
为交互过程通信选择基于L3 / L2的网络堆栈是可能的,但也是最“昂贵”的选项。
核心错误:
鉴于这种选择,任何单个进程(不是说一对进程)都会在尝试将其接入点连接到相同的TCP/IP时发生冲突-.bind()
address:port#
另一个错误:
即使是为了启动一个单独的程序,两个生成的线程都尝试访问.bind()
其私有AccessPoint,但没有一个尝试尝试.connect()
匹配的“相反” AccessPoint。
至少要成功.bind()
,
至少要成功.connect()
,才能得到一个“通道”,这里是PUB/SUB
Archetype。
去做:
- 决定一个适当的、足够正确的传输类(最好避免为 localhost/in-process IPC 操作完整的 L3/L2 堆栈而过度杀伤力)
- 重构
Address:port#
管理(对于 2+ 个进程不会在.bind()
-(s) 上失败)相同(硬连线)address:port#
- 始终检测并适当地处理
{PASS|FAIL}
从 API 调用返回的 -s - 总是
LINGER
显式设置为零(你永远不知道)
推荐阅读
- java - 递归方法中的多个返回
- c# - 在c#中将字符串拆分为字符串数组
- javascript - 使用 JS 和 Rails 为全文用户搜索动态插入内容
- c# - 如何在 Xamarin 表单中与其他 UI 元素一起使用 ZXing.Net?
- r - 如何选择多行
- c# - C# Bindingsource 过滤
- git - 如何撤消多个“git commit --amend”或为每个修改提交获取差异?
- linq-to-sql - 方法不支持对 SQL 的转换(Linq to Sql)
- python - 在 Google App Engine 上运行 Python37 的问题
- javascript - 时刻时间戳 === UTC?