node.js - 如何加速我的 nodejs 集群代码?
问题描述
我通过 Nodejs 集群模式发送推送通知。但是速度非常慢。我当前的代码需要 5 分钟才能完成 400 000 条通知。
谁能告诉我如何提高速度?
const webpush = require('web-push');
const cluster = require('cluster');
//const numCPUs = require('os').cpus().length;
const mysql = require('mysql');
const numCPUs = process.argv[6];
let begin=Date.now();
if (cluster.isMaster) {
let lastInserID = 0;
let workerCounters = 0;
let loop = 0;
const con = mysql.createConnection({
host: process.argv[2],
user: process.argv[3],
password: process.argv[4],
database: process.argv[5]
});
con.query("SELECT COUNT(*) AS counter FROM web_push_payloads", function (err, result) {
result.forEach(row => {
if(row.counter > process.argv[7]){
loop = Math.ceil(process.argv[7]/numCPUs);
}
else{
loop = Math.ceil(row.counter/numCPUs);
}
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
worker.send(loop);
}
});
});
cluster.on('message', (worker, message) => {
workerCounters++;
if(message.lastInserID > lastInserID){
lastInserID = message.lastInserID;
}
let end= Date.now();
if(numCPUs == workerCounters){
con.query("DELETE FROM web_push_payloads WHERE id <="+lastInserID, function (err, result) {
if (err) throw err;
console.log(result.affectedRows + " record(s) deleted");
console.log("All worker are ended "+result.affectedRows+" in "+((end-begin)/1000)+" secs");
con.end();
process.exit(200);
});
}
});
} else {
process.on('message', (loop) => {
const con = mysql.createConnection({
host: process.argv[2],
user: process.argv[3],
password: process.argv[4],
database: process.argv[5]
});
let insertSQL = "";
let insertCounter = 0;
let allInsertCounter = 0;
let operCounter = 0;
let ReOperCounter = 0;
let lastInserID = 0;
let start = 0;
let body = "";
con.connect(function(err) {
if (err) throw err;
start = (cluster.worker.id-1)*loop;
con.query("SELECT * FROM web_push_payloads ORDER BY id ASC LIMIT "+start+","+loop, function (err, result) {
if (err) throw err;
if (! result.length){
process.send({ lastInserID: 0});
return process.exit();
}
result.forEach(message => {
message.request = JSON.parse(message.request);
const pushSubscription = {
endpoint: message.request.endpoint,
keys: {
p256dh: message.request.p256dh,
auth: message.request.auth
}
};
const options = {
gcmAPIKey: 'AIzaSyBQ-MFenm1byqoT2IsdiSrm0ZK0Eey44uk',
TTL: message.request.ttl,
vapidDetails: {
subject: 'mailto:' + process.argv[8],
publicKey: message.request.vappubkey,
privateKey: message.request.vapprvkey
}
};
try {
allInsertCounter++;
insertCounter++;
const details = webpush.generateRequestDetails(
pushSubscription,
message.payload,
options
);
//console.log(details);
body = details.body.toString('base64');
details.body = "";
details.deviceid = message.request.deviceid;
details.platid = message.request.platid;
details.appid = message.request.appid;
details.campid = message.request.campid;
details.msg_type = message.request.msg_type;
insertSQL += "('"+JSON.stringify(details)+"','"+body+"'),";
if(insertCounter >= 5000 || allInsertCounter == result.length){
operCounter++;
con.query("INSERT INTO web_push_queue (request,payload) VALUES "+insertSQL.replace(/(,$)/g, ""), function (err, result) {
if (err) console.error(err);
ReOperCounter++;
if(message.id > lastInserID){
lastInserID = message.id;
}
if(operCounter == ReOperCounter){
process.send({ lastInserID: lastInserID});
con.end();
return process.exit();
}
});
insertCounter = 0;
insertSQL = "";
}
} catch (err) {
allInsertCounter++;
insertCounter++;
}
});
result = null;
});
});
});
}
如何提高上述代码速度?
我听说过 PM2。但是我在当前代码上完成 pm2 时遇到了麻烦。
解决方案
推荐阅读
- python - 如何保存作为 jupyter 中函数输出的 matplotlib 图?
- c - 如何在我的 C 程序中纠正这个错误?
- excel - 返回两个日期之间的最新余额
- c# - c#导出文本文件的函数只运行一次
- flutter - 你需要在 dart 中嵌套多深的 const 构造函数?
- kubernetes - 如何设置 EKS 服务器以供 kubectl 对话?
- node.js - Yarn vs NPM 内存消耗
- twilio - 如何获取用户所属的所有对话?
- python - Penney 的博弈概率
- swift - 无法转换类型“(AnyPublisher)的返回表达式
, APIError)' 返回类型 'AnyPublisher '