首页 > 解决方案 > 最佳进程间消息传递模式,用于定期监听同一本地主机上的多个进程

问题描述

我有一个奇怪的案例,我需要从我的主进程创建许多进程。

我创建的这些进程会将来自 Web 套接字的一些消息排队。

每隔一段时间,比如每隔一秒左右,我就会从我的主进程中轮询这些小进程。我使用的语言是 D,消息传递库是 ZMQD(它只是 C zmq library 的包装器)。

我的主要流程的最小示例:

Socket*[] socketList;

string sRecv( Socket* socket)
{
    ubyte[256] buffer;
    immutable size = socket.receive(buffer);
    import std.algorithm: min;
    return buffer[0 .. min(size,256)].idup.asString();
}

void startServer( string servername )
{
    auto pid = spawnProcess(["/home/erdem/eclipse-workspace/WebSocketDenemesi/websocketdenemesi",
                              servername, "\n"]);
    auto requester = new Socket(SocketType.req);
    auto allName = "ipc:///tmp/" ~  servername;
    requester.connect(allName);
    socketList ~= requester;

}

void main() {

    import std.array : split;
    import std.algorithm : each;

    startServer("iotabtc@depth");
    startServer("iotabtc@aggTrade");
    startServer("ethbtc@depth");

    int counter = 30;
    while(counter--) {
        foreach ( requester; socketList)
        {
            requester.send("send"); 
        }

        foreach ( requester; socketList)
        {
            auto strList = sRecv(requester).split("\n");
            strList.each!( str => writefln("Received [%d]reply [%s]", strList.length,  str) );

        }
        sleep(1000.msecs);
    }
    foreach ( requester; socketList)
    {
        requester.send("done"); 
    }
}

我的小流程的最小示例:

WebSocket startSocket( string temp )
{
    auto ws_url = URL(temp);
    auto ws = connectWebSocket(ws_url);
    if ( !ws.connected )
        return null;    
    return  ws;
}

void close( WebSocket ws )
{
    int timeOut = 5;
    while ( ws && ws.connected && timeOut-- )
    {
        vibe.core.concurrency.async( { ws.close(); return true;} ); 
        sleep(5.msecs);
    }   
}

string sRecv(ref Socket socket)
{
    ubyte[256] buffer;
    immutable size = socket.tryReceive(buffer)[0];
    import std.algorithm: min;
    return size ? buffer[0 .. min(size,256)].idup.asString() : "";
}

void main( string[] args ) {

    auto responder = Socket(SocketType.rep);
    string seperatorChar = args[2];
    string temp = "ipc:///tmp/" ~ args[1];
    responder.bind(temp);

    string socketName =  "wss://stream.binance.com:9443/ws/" ~ args[1];
    auto curSocket = startSocket(socketName);
    string curString;
    while (true) {
        auto result = responder.sRecv();
        if ( result == "send")
        {
            responder.send(curString);      
            curString = "";
        }
        else if ( result == "done" )
        {
            break;
        }
        else 
        {
            if ( curSocket.dataAvailableForRead )
            {
                auto text = curSocket.receiveText();
                if ( !curString.empty )
                   curString ~= seperatorChar;
                curString ~= text;
            }
        }
        sleep(100.msecs);
    }
    writeln( "Shutting down: ", args[1]);
    curSocket.close();

}

这是我第一次使用这个消息库。这就是我使用简单REQ/REP套接字的原因。有没有更好的方法来实现我的要求。例如,是否有更好的消息传递模式?例如,是否有一种模式,其中我的小进程不会被responder.receive( buffer );.

如果有的话,我就不需要从另一个线程监听 websocket。

标签: cdesign-patternsmessage-queuezeromqd

解决方案


欢迎使用基于 ZeroMQ 的

例如,是否有更好的消息传递模式

这取决于您的进程需要如何通信。简而言之,REQ/REP在阻塞模式下使用几乎是菜单中最糟糕的选项。

  • 鉴于您的 websocket 只接收一条异步信息(这是一种常见方式,市场如何重新广播事件流),纯ws.recv()+ PUSHer.send()+if PULLer.poll(): PULLer.recv()流水线事件获取 +PUSH/PULL传播 + 条件重新处理将最符合现实世界行为。

  • 鉴于您的处理场足迹可能会超出单个本地主机,其他传输类,对于非本地节点〜{ tipc:// | tcp:// | udp:// | pgm:// | epgm:// | norm:// | vmci:// }可能会进入游戏,与ipc://您当前本地主机上的 -links 一起 - ZeroMQ 处理这种混合的透明度是一个很酷的好处开始掌握零之禅。

  • 鉴于延迟在大规模处理分布中至关重要,PUB/SUB可扩展的正式通信原型模式可能会变得有益,可以选择.setsockopt( zmq.CONFLATE, 1 )用于非日志节点,其中只有最近的价格与采取任何类型的任何响应式 XTO 操作相关种。


推荐阅读