首页 > 解决方案 > amqp.connect 无法永久保持连接

问题描述

我的代码如下所示:

rabbitmq.js

const connectRabbitMq = () => {
    amqp.connect(process.env.CLOUDAMQP_MQTT_URL, function (err, conn) {
        if (err) {
            console.error(err);
            console.log('[AMQP] reconnecting in 1s');
            setTimeout(connectRabbitMq, 1);
            return;
        }
        conn.createChannel((err, ch) => {
            if (!err) {
                console.log('Channel created');
                channel = ch;
                connection = conn;
            }
        });

        conn.on("error", function (err) {
            if (err.message !== "Connection closing") {
                console.error("[AMQP] conn error", err.message);
            }
        });

        conn.on("close", function () {
            console.error("[AMQP] reconnecting");
            connectRabbitMq();
        });
    })

};


const sendMessage = () => {


    let data = {
        user_id: 1,
        test_id: 2
    };

    if (channel) {
        channel.sendToQueue(q, new Buffer(JSON.stringify(data)), {
            persistent: true
        });
    }

    else {

        connectRabbitMq(() => {
            channel.sendToQueue(q, new Buffer(JSON.stringify(data)), {
                persistent: true
            });
        })
    }
};


const receiveMessage = () => {

    if (channel) {

        channel.consume(q, function (msg) {
            // ch.ack(msg);
            console.log(" [x] Received %s", msg.content.toString());
        });
    }

    else {

        connectRabbitMq(() => {
            channel.consume(q, function (msg) {
                // ch.ack(msg);
                console.log(" [x] Received %s", msg.content.toString());
            });
        })
    }
}

调度程序.js

let cron = require('node-cron');


const callMethodForeverRabbitMq = () => {


    cron.schedule('*/1 * * * * *', function () {

        rabbitMqClientPipeline.receiveMessage();
    });


};

应用程序.js

rabbitmq.sendMessage();

现在这里发生的是,代码不能永远保持连接。那么有什么办法可以让它永远活着吗?

标签: node.jsrabbitmqamqp

解决方案


我不确定您使用的是 Promise api 还是回调 API。

使用 Promise API,您可以这样做:

const amqp = require('amqplib');

const delay = (ms) => new Promise((resolve => setTimeout(resolve, ms)));

const connectRabbitMq = () => amqp.connect('amqp://127.0.0.1:5672')
  .then((conn) => {
    conn.on('error', function (err) {
      if (err.message !== 'Connection closing') {
        console.error('[AMQP] conn error', err.message);
      }
    });

    conn.on('close', function () {
      console.error('[AMQP] reconnecting');
      connectRabbitMq();
    });

    //connection = conn;

    return conn.createChannel();
  })
  .then(ch => {
    console.log('Channel created');
    //channel = ch;
  })
  .catch((error) => {
    console.error(error);
    console.log('[AMQP] reconnecting in 1s');
    return delay(1000).then(() => connectRabbitMq())
  });

connectRabbitMq();

使用这样的回调 API:

const amqp = require('amqplib/callback_api');

const connectRabbitMq = () => {
  amqp.connect('amqp://127.0.0.1:5672', function (err, conn) {
    if (err) {
      console.error(err);
      console.log('[AMQP] reconnecting in 1s');
      setTimeout(connectRabbitMq, 1000);
      return;
    }
    conn.createChannel((err, ch) => {
      if (!err) {
        console.log('Channel created');
        //channel = ch;
        //connection = conn;
      }
    });

    conn.on("error", function (err) {
      if (err.message !== "Connection closing") {
        console.error("[AMQP] conn error", err.message);
      }
    });

    conn.on("close", function () {
      console.error("[AMQP] reconnecting");
      connectRabbitMq();
    });
  })
};

connectRabbitMq();

使用请求缓冲更新新代码

const buffer = [];
let connection = null;
let channel = null;

const connectRabbitMq = () => {
  amqp.connect('amqp://127.0.0.1:5672', function (err, conn) {
    if (err) {
      console.error(err);
      console.log('[AMQP] reconnecting in 1s');
      setTimeout(connectRabbitMq, 1000);
      return;
    }
    conn.createChannel((err, ch) => {
      if (!err) {
        console.log('Channel created');
        channel = ch;
        connection = conn;

        while (buffer.length > 0) {
          const request = buffer.pop();
          request();
        }
      }
    });

    conn.once("error", function (err) {
      channel = null;
      connection = null;

      if (err.message !== "Connection closing") {
        console.error("[AMQP] conn error", err.message);
      }
    });

    conn.once("close", function () {
      channel = null;
      connection = null;

      console.error("[AMQP] reconnecting");
      connectRabbitMq();
    });
  })
};

const sendMessage = () => {
  let data = {
    user_id: 1,
    test_id: 2
  };

  if (channel) {
    channel.sendToQueue(q, new Buffer(JSON.stringify(data)), {
      persistent: true
    });
  }
  else {
    buffer.push(() => {
      channel.sendToQueue(q, new Buffer(JSON.stringify(data)), {
        persistent: true
      });
    });
  }
};

const receiveMessage = () => {
  if (channel) {
    channel.consume(q, function (msg) {
      // ch.ack(msg);
      console.log(" [x] Received %s", msg.content.toString());
    });
  }
  else {
    buffer.push(() => {
      channel.consume(q, function (msg) {
        // ch.ack(msg);
        console.log(" [x] Received %s", msg.content.toString());
      });
    })
  }
};

在某些极端情况下,此代码将不起作用 - 例如,queue.consume除非明确调用它,否则它不会重新建立。但总的来说,这希望能让您了解如何实施适当的恢复......


推荐阅读