首页 > 解决方案 > node.js 中的多线程/集群,

问题描述

我需要有关节点中集群的帮助。这是程序的骨架

global values a,b,c

function1{

get data from websocket and update the value a,b,c for every incoming message from datastream.

}

function2{
    while(1){

        do some processing using a,b,c values.
    }
}

如果从 websocket 流中获取任何数据,函数 1 会更新一次数据,数据可能随时到达 - 例如有时每秒 100 个数据,有时每 10 秒 2 个数据。因此,无论数据何时到达(即连续),函数 2 都必须运行

我需要并行运行这些功能 1 和 2,就像每个在不同的 cpu 上(或作为集群)

我试图在 上运行async.parallel,但它在单个线程上运行,所以直到函数 2 中的 while 循环结束,函数 1 才会暂停。所以它不能使用它来完成,所以需要在不同的线程上同时运行。

我看到了那个集群模块,并尝试使用它,但是,看到这个场景

if (cluster.isPrimary) {

    function 1()

    // creating child process in other thread
    cluster.fork();
 
} else {

    function 2()
}

但在这种情况下,子集群无法访问全局值 a、b、c,因为它们是使用函数 1 更新/分配的,因此子集群无法访问主集群的全局变量 a、b、c,而子集群的 a、b、c 将为空,因为它不在子线程上运行函数 1。

我需要的是在不同的线程上并行运行这两个函数。有什么办法可以代替运行 cluster.fork 在不同的线程中运行函数 2,以便它可以访问全局变量 a、b、c。

完整的代码是这样的:

const cluster = require('cluster');
const numCPUs = require('os').cpus().length;

var t = 1;
var pca = "1";
var pcb = "1";
var pta = "1";
var ptb = "1";
var datato = {
    "method": "SUBSCRIBE",
    "params": [
        "trxusdc@bookTicker"
    ],
    "id": 1
}
var axios = require('axios');
var configt = {
    method: 'get',
    url: 'https://api.binance.com/api/v3/ticker/bookTicker?symbol=TRXUSDT',
    headers: {}
};
var configc = {
    method: 'get',
    url: 'https://api.binance.com/api/v3/ticker/bookTicker?symbol=TRXUSDT',
    headers: {}
};
async function mymodule_init0() {
    axios(configc)
        .then(function(response) {
            pca = JSON.parse(JSON.stringify(response.data)).askPrice;
            pcb = JSON.parse(JSON.stringify(response.data)).bidPrice;
            console.log("1");

            //--------subaxios for usdt
            axios(configt)
                .then(function(response) {
                    pta = JSON.parse(JSON.stringify(response.data)).askPrice;
                    ptb = JSON.parse(JSON.stringify(response.data)).bidPrice;
                    console.log("2");
                    // mymodule_init1();
                })
                .catch(function(error) {
                    console.log(error);
                });


        })
        .catch(function(error) {
            console.log(error);
        });

}


async function mymodule_init1() {

    const WS = require('ws')
    const ws = new WS('wss://stream.binance.com:9443/ws/trxusdt@bookTicker')

    ws.on('message', function incoming(sdata) {

        if (JSON.parse(sdata).s == "TRXUSDC") {
            pca = JSON.parse(sdata).a;
            pcb = JSON.parse(sdata).b;
        }
        if (JSON.parse(sdata).s == "TRXUSDT") {
            pta = JSON.parse(sdata).a;
            ptb = JSON.parse(sdata).b;
        }



        if (t == 1) {
            ws.send(JSON.stringify(datato));
            t++;
        }











    });

}


async function mymodule_init2() {





    while (1) {

        console.log(pca + " \t " + ptb + "\t" + pta + " \t " + pcb );

    }



}




setTimeout(function() {
    new Promise(resolve => {


        if (cluster.isPrimary) {
            console.log(`Primary ${process.pid} is running`);

            // Fork workers.
            for (let i = 0; i < numCPUs; i++) {
                cluster.fork();
            }

            cluster.on('exit', (worker, code, signal) => {
                console.log(`worker ${worker.process.pid} died`);
            });
        }
        parallel(mymodule_init1(), mymodule_init2());


    })
}, 5000);

mymodule_init0();

所以我需要在不同的线程中运行模块 init 2,我的意思是无论主线程中的函数如何,它都应该并行/单独运行

标签: javascriptnode.jsmultithreadingparallel-processingnode-cluster

解决方案


推荐阅读