javascript - node.js 中的多线程/集群,
问题描述
我需要有关节点中集群的帮助。这是程序的骨架
global values a,b,c
function1{
get data from websocket and update the value a,b,c for every incoming message from datastream.
}
function2{
while(1){
do some processing using a,b,c values.
}
}
如果从 websocket 流中获取任何数据,函数 1 会更新一次数据,数据可能随时到达 - 例如有时每秒 100 个数据,有时每 10 秒 2 个数据。因此,无论数据何时到达(即连续),函数 2 都必须运行
我需要并行运行这些功能 1 和 2,就像每个在不同的 cpu 上(或作为集群)
我试图在 上运行async.parallel
,但它在单个线程上运行,所以直到函数 2 中的 while 循环结束,函数 1 才会暂停。所以它不能使用它来完成,所以需要在不同的线程上同时运行。
我看到了那个集群模块,并尝试使用它,但是,看到这个场景
if (cluster.isPrimary) {
function 1()
// creating child process in other thread
cluster.fork();
} else {
function 2()
}
但在这种情况下,子集群无法访问全局值 a、b、c,因为它们是使用函数 1 更新/分配的,因此子集群无法访问主集群的全局变量 a、b、c,而子集群的 a、b、c 将为空,因为它不在子线程上运行函数 1。
我需要的是在不同的线程上并行运行这两个函数。有什么办法可以代替运行 cluster.fork 在不同的线程中运行函数 2,以便它可以访问全局变量 a、b、c。
完整的代码是这样的:
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
var t = 1;
var pca = "1";
var pcb = "1";
var pta = "1";
var ptb = "1";
var datato = {
"method": "SUBSCRIBE",
"params": [
"trxusdc@bookTicker"
],
"id": 1
}
var axios = require('axios');
var configt = {
method: 'get',
url: 'https://api.binance.com/api/v3/ticker/bookTicker?symbol=TRXUSDT',
headers: {}
};
var configc = {
method: 'get',
url: 'https://api.binance.com/api/v3/ticker/bookTicker?symbol=TRXUSDT',
headers: {}
};
async function mymodule_init0() {
axios(configc)
.then(function(response) {
pca = JSON.parse(JSON.stringify(response.data)).askPrice;
pcb = JSON.parse(JSON.stringify(response.data)).bidPrice;
console.log("1");
//--------subaxios for usdt
axios(configt)
.then(function(response) {
pta = JSON.parse(JSON.stringify(response.data)).askPrice;
ptb = JSON.parse(JSON.stringify(response.data)).bidPrice;
console.log("2");
// mymodule_init1();
})
.catch(function(error) {
console.log(error);
});
})
.catch(function(error) {
console.log(error);
});
}
async function mymodule_init1() {
const WS = require('ws')
const ws = new WS('wss://stream.binance.com:9443/ws/trxusdt@bookTicker')
ws.on('message', function incoming(sdata) {
if (JSON.parse(sdata).s == "TRXUSDC") {
pca = JSON.parse(sdata).a;
pcb = JSON.parse(sdata).b;
}
if (JSON.parse(sdata).s == "TRXUSDT") {
pta = JSON.parse(sdata).a;
ptb = JSON.parse(sdata).b;
}
if (t == 1) {
ws.send(JSON.stringify(datato));
t++;
}
});
}
async function mymodule_init2() {
while (1) {
console.log(pca + " \t " + ptb + "\t" + pta + " \t " + pcb );
}
}
setTimeout(function() {
new Promise(resolve => {
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
}
parallel(mymodule_init1(), mymodule_init2());
})
}, 5000);
mymodule_init0();
所以我需要在不同的线程中运行模块 init 2,我的意思是无论主线程中的函数如何,它都应该并行/单独运行
解决方案
推荐阅读
- bash - 无法使用 sudo 创建目录
- autosar - AUTOSAR 中的端口原型定义
- dart - 颤抖我如何在 Dart 中获得标题?
- database - 使用 000webhost 从数据库获取数据的问题
- java - 从jsp传递参数到springboot控制器
- python - 适用于非 Python 2 用户的 Python 3 super()
- angular - 子模块的角度延迟加载不起作用?
- python - 附加 RTF 格式样式
- reactjs - TS 错误:“JSX.IntrinsicElements”类型上不存在属性“./PresentationAttributesForm”
- pm2 - pm2 monit - 有没有办法清除全局日志?