c - 最佳进程间消息传递模式,用于定期监听同一本地主机上的多个进程
问题描述
我有一个奇怪的案例,我需要从我的主进程创建许多进程。
我创建的这些进程会将来自 Web 套接字的一些消息排队。
每隔一段时间,比如每隔一秒左右,我就会从我的主进程中轮询这些小进程。我使用的语言是 D,消息传递库是 ZMQD(它只是 C zmq library 的包装器)。
我的主要流程的最小示例:
Socket*[] socketList;
string sRecv( Socket* socket)
{
ubyte[256] buffer;
immutable size = socket.receive(buffer);
import std.algorithm: min;
return buffer[0 .. min(size,256)].idup.asString();
}
void startServer( string servername )
{
auto pid = spawnProcess(["/home/erdem/eclipse-workspace/WebSocketDenemesi/websocketdenemesi",
servername, "\n"]);
auto requester = new Socket(SocketType.req);
auto allName = "ipc:///tmp/" ~ servername;
requester.connect(allName);
socketList ~= requester;
}
void main() {
import std.array : split;
import std.algorithm : each;
startServer("iotabtc@depth");
startServer("iotabtc@aggTrade");
startServer("ethbtc@depth");
int counter = 30;
while(counter--) {
foreach ( requester; socketList)
{
requester.send("send");
}
foreach ( requester; socketList)
{
auto strList = sRecv(requester).split("\n");
strList.each!( str => writefln("Received [%d]reply [%s]", strList.length, str) );
}
sleep(1000.msecs);
}
foreach ( requester; socketList)
{
requester.send("done");
}
}
我的小流程的最小示例:
WebSocket startSocket( string temp )
{
auto ws_url = URL(temp);
auto ws = connectWebSocket(ws_url);
if ( !ws.connected )
return null;
return ws;
}
void close( WebSocket ws )
{
int timeOut = 5;
while ( ws && ws.connected && timeOut-- )
{
vibe.core.concurrency.async( { ws.close(); return true;} );
sleep(5.msecs);
}
}
string sRecv(ref Socket socket)
{
ubyte[256] buffer;
immutable size = socket.tryReceive(buffer)[0];
import std.algorithm: min;
return size ? buffer[0 .. min(size,256)].idup.asString() : "";
}
void main( string[] args ) {
auto responder = Socket(SocketType.rep);
string seperatorChar = args[2];
string temp = "ipc:///tmp/" ~ args[1];
responder.bind(temp);
string socketName = "wss://stream.binance.com:9443/ws/" ~ args[1];
auto curSocket = startSocket(socketName);
string curString;
while (true) {
auto result = responder.sRecv();
if ( result == "send")
{
responder.send(curString);
curString = "";
}
else if ( result == "done" )
{
break;
}
else
{
if ( curSocket.dataAvailableForRead )
{
auto text = curSocket.receiveText();
if ( !curString.empty )
curString ~= seperatorChar;
curString ~= text;
}
}
sleep(100.msecs);
}
writeln( "Shutting down: ", args[1]);
curSocket.close();
}
这是我第一次使用这个消息库。这就是我使用简单REQ/REP
套接字的原因。有没有更好的方法来实现我的要求。例如,是否有更好的消息传递模式?例如,是否有一种模式,其中我的小进程不会被responder.receive( buffer );
.
如果有的话,我就不需要从另一个线程监听 websocket。
解决方案
欢迎使用基于 ZeroMQ 的分布式计算
例如,是否有更好的消息传递模式?
这取决于您的进程需要如何通信。简而言之,REQ/REP
在阻塞模式下使用几乎是菜单中最糟糕的选项。
鉴于您的 websocket 只接收一条异步信息(这是一种常见方式,市场如何重新广播事件流),纯
ws.recv()
+PUSHer.send()
+if PULLer.poll(): PULLer.recv()
流水线事件获取 +PUSH/PULL
传播 + 条件重新处理将最符合现实世界行为。鉴于您的处理场足迹可能会超出单个本地主机,其他传输类,对于非本地节点〜
{ tipc:// | tcp:// | udp:// | pgm:// | epgm:// | norm:// | vmci:// }
可能会进入游戏,与ipc://
您当前本地主机上的 -links 一起 - ZeroMQ 处理这种混合的透明度是一个很酷的好处开始掌握零之禅。鉴于延迟在大规模处理分布中至关重要,
PUB/SUB
可扩展的正式通信原型模式可能会变得有益,可以选择.setsockopt( zmq.CONFLATE, 1 )
用于非日志节点,其中只有最近的价格与采取任何类型的任何响应式 XTO 操作相关种。
推荐阅读
- c# - C# 抛出然后继续返回
- node.js - 如何从 Node 中的块中提取 From、To、Subject
- angular - NgSwitch - 如何通过单击按钮禁用其他按钮
- mysql - JPQL 未在服务器上运行
- angular - 构建 Angular 库时是否可以使用自定义 Webpack 配置?
- shell - 使用`sed`包装字符串
- php - Symfony Messenger 中没有配置处理程序
- android-webview - 动态图像无法加载,显示为 text/html MIME 类型,修复 webview?
- django - Django url 渲染
- flutter - 如果值等于其他值,则 Dart 删除项目表单列表