首页 > 解决方案 > 如何加速我的 nodejs 集群代码?

问题描述

我通过 Nodejs 集群模式发送推送通知。但是速度非常慢。我当前的代码需要 5 分钟才能完成 400 000 条通知。

谁能告诉我如何提高速度?

const webpush = require('web-push');
const cluster = require('cluster');
//const numCPUs = require('os').cpus().length;
const mysql = require('mysql');

const numCPUs = process.argv[6];

let begin=Date.now();

if (cluster.isMaster) {
  let lastInserID = 0;
  let workerCounters = 0;
  let loop = 0;
  const con = mysql.createConnection({
    host: process.argv[2],
    user: process.argv[3],
    password: process.argv[4],
    database: process.argv[5]
  });
  con.query("SELECT COUNT(*) AS counter FROM web_push_payloads", function (err, result) {
    result.forEach(row => {
      if(row.counter > process.argv[7]){
        loop = Math.ceil(process.argv[7]/numCPUs);
      }
      else{
        loop = Math.ceil(row.counter/numCPUs);
      }

      // Fork workers.
      for (let i = 0; i < numCPUs; i++) {
        const worker = cluster.fork();
        worker.send(loop);
      }
    });
  });
  
  cluster.on('message', (worker, message) => {
    workerCounters++;
    if(message.lastInserID > lastInserID){
      lastInserID = message.lastInserID;
    }
    let end= Date.now();
    if(numCPUs == workerCounters){
      con.query("DELETE FROM web_push_payloads WHERE id <="+lastInserID, function (err, result) {
        if (err) throw err;
        console.log(result.affectedRows + " record(s) deleted");
        console.log("All worker are ended "+result.affectedRows+" in "+((end-begin)/1000)+" secs");
        con.end();
        process.exit(200);
      });
    }
  });
} else {
  
  process.on('message', (loop) => {
    const con = mysql.createConnection({
      host: process.argv[2],
      user: process.argv[3],
      password: process.argv[4],
      database: process.argv[5]
    });

    let insertSQL = "";
    let insertCounter = 0;
    let allInsertCounter = 0;
    let operCounter = 0;
    let ReOperCounter = 0;
    let lastInserID = 0;
    let start = 0;
    let body = "";

    con.connect(function(err) {
      if (err) throw err;
      start = (cluster.worker.id-1)*loop;
      con.query("SELECT * FROM web_push_payloads ORDER BY id ASC LIMIT "+start+","+loop, function (err, result) {
        if (err) throw err;
        if (! result.length){
          process.send({ lastInserID:  0});
          return process.exit();
        }
        result.forEach(message => {
          message.request = JSON.parse(message.request);
          const pushSubscription = {
            endpoint: message.request.endpoint,
            keys: {
              p256dh: message.request.p256dh,
              auth: message.request.auth
            }
          };
         
          const options = {
            gcmAPIKey: 'AIzaSyBQ-MFenm1byqoT2IsdiSrm0ZK0Eey44uk',
            TTL: message.request.ttl,
            vapidDetails: {
              subject: 'mailto:' + process.argv[8],
              publicKey: message.request.vappubkey,
              privateKey: message.request.vapprvkey
            }
          };
          try {
            allInsertCounter++;
            insertCounter++;
            const details = webpush.generateRequestDetails(
              pushSubscription,
              message.payload,
              options
            );
            //console.log(details);
            body = details.body.toString('base64');
            details.body = "";
            details.deviceid = message.request.deviceid;
            details.platid = message.request.platid;
            details.appid = message.request.appid;
            details.campid = message.request.campid;
            details.msg_type = message.request.msg_type;
            insertSQL += "('"+JSON.stringify(details)+"','"+body+"'),";
            if(insertCounter >= 5000 || allInsertCounter == result.length){
              operCounter++;
              con.query("INSERT INTO web_push_queue (request,payload) VALUES "+insertSQL.replace(/(,$)/g, ""), function (err, result) {
                if (err) console.error(err);
                ReOperCounter++;
                if(message.id > lastInserID){
                  lastInserID = message.id;
                }
                if(operCounter == ReOperCounter){
                  process.send({ lastInserID:  lastInserID});
                  con.end();
                  return process.exit();
                }
              });
              insertCounter = 0;
              insertSQL = "";
            }
          } catch (err) {
            allInsertCounter++;
            insertCounter++;
          }
        });
        result = null;
      });
      
    });
  });
}

如何提高上述代码速度?

我听说过 PM2。但是我在当前代码上完成 pm2 时遇到了麻烦。

标签: node.jsnode-modulespm2

解决方案


推荐阅读