node.js - amqp.connect 无法永久保持连接
问题描述
我的代码如下所示:
rabbitmq.js
const connectRabbitMq = () => {
amqp.connect(process.env.CLOUDAMQP_MQTT_URL, function (err, conn) {
if (err) {
console.error(err);
console.log('[AMQP] reconnecting in 1s');
setTimeout(connectRabbitMq, 1);
return;
}
conn.createChannel((err, ch) => {
if (!err) {
console.log('Channel created');
channel = ch;
connection = conn;
}
});
conn.on("error", function (err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
connectRabbitMq();
});
})
};
const sendMessage = () => {
let data = {
user_id: 1,
test_id: 2
};
if (channel) {
channel.sendToQueue(q, new Buffer(JSON.stringify(data)), {
persistent: true
});
}
else {
connectRabbitMq(() => {
channel.sendToQueue(q, new Buffer(JSON.stringify(data)), {
persistent: true
});
})
}
};
const receiveMessage = () => {
if (channel) {
channel.consume(q, function (msg) {
// ch.ack(msg);
console.log(" [x] Received %s", msg.content.toString());
});
}
else {
connectRabbitMq(() => {
channel.consume(q, function (msg) {
// ch.ack(msg);
console.log(" [x] Received %s", msg.content.toString());
});
})
}
}
调度程序.js
let cron = require('node-cron');
const callMethodForeverRabbitMq = () => {
cron.schedule('*/1 * * * * *', function () {
rabbitMqClientPipeline.receiveMessage();
});
};
应用程序.js
rabbitmq.sendMessage();
现在这里发生的是,代码不能永远保持连接。那么有什么办法可以让它永远活着吗?
解决方案
我不确定您使用的是 Promise api 还是回调 API。
使用 Promise API,您可以这样做:
const amqp = require('amqplib');
const delay = (ms) => new Promise((resolve => setTimeout(resolve, ms)));
const connectRabbitMq = () => amqp.connect('amqp://127.0.0.1:5672')
.then((conn) => {
conn.on('error', function (err) {
if (err.message !== 'Connection closing') {
console.error('[AMQP] conn error', err.message);
}
});
conn.on('close', function () {
console.error('[AMQP] reconnecting');
connectRabbitMq();
});
//connection = conn;
return conn.createChannel();
})
.then(ch => {
console.log('Channel created');
//channel = ch;
})
.catch((error) => {
console.error(error);
console.log('[AMQP] reconnecting in 1s');
return delay(1000).then(() => connectRabbitMq())
});
connectRabbitMq();
使用这样的回调 API:
const amqp = require('amqplib/callback_api');
const connectRabbitMq = () => {
amqp.connect('amqp://127.0.0.1:5672', function (err, conn) {
if (err) {
console.error(err);
console.log('[AMQP] reconnecting in 1s');
setTimeout(connectRabbitMq, 1000);
return;
}
conn.createChannel((err, ch) => {
if (!err) {
console.log('Channel created');
//channel = ch;
//connection = conn;
}
});
conn.on("error", function (err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
connectRabbitMq();
});
})
};
connectRabbitMq();
使用请求缓冲更新新代码
const buffer = [];
let connection = null;
let channel = null;
const connectRabbitMq = () => {
amqp.connect('amqp://127.0.0.1:5672', function (err, conn) {
if (err) {
console.error(err);
console.log('[AMQP] reconnecting in 1s');
setTimeout(connectRabbitMq, 1000);
return;
}
conn.createChannel((err, ch) => {
if (!err) {
console.log('Channel created');
channel = ch;
connection = conn;
while (buffer.length > 0) {
const request = buffer.pop();
request();
}
}
});
conn.once("error", function (err) {
channel = null;
connection = null;
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.once("close", function () {
channel = null;
connection = null;
console.error("[AMQP] reconnecting");
connectRabbitMq();
});
})
};
const sendMessage = () => {
let data = {
user_id: 1,
test_id: 2
};
if (channel) {
channel.sendToQueue(q, new Buffer(JSON.stringify(data)), {
persistent: true
});
}
else {
buffer.push(() => {
channel.sendToQueue(q, new Buffer(JSON.stringify(data)), {
persistent: true
});
});
}
};
const receiveMessage = () => {
if (channel) {
channel.consume(q, function (msg) {
// ch.ack(msg);
console.log(" [x] Received %s", msg.content.toString());
});
}
else {
buffer.push(() => {
channel.consume(q, function (msg) {
// ch.ack(msg);
console.log(" [x] Received %s", msg.content.toString());
});
})
}
};
在某些极端情况下,此代码将不起作用 - 例如,queue.consume
除非明确调用它,否则它不会重新建立。但总的来说,这希望能让您了解如何实施适当的恢复......
推荐阅读
- google-app-maker - 如何在表格中显示特定月份的数据?
- c++ - 为什么我必须在 std::array 的初始化时为每个项目指定类型
在 C++ 中 - pandas - 重建缺失的数据
- dialogflow-es - “让我们获取 AppName”和“Here's AppName”现在始终存在;用于在一定次数的频繁调用后被删除
- plsql - 有人可以帮助解决这些plsql问题吗,我很难完全理解它
- c# - 尝试在服务层.Net Core中使用自动映射器时出现红色下划线错误
- swift - Swift 中奇怪的 UI 错误
- hive - 错误:org.apache.thrift.transport.TTransportException java.net.SocketException: Broken pipe (Write failed) (State=08S01,code=0)
- vue.js - 页面加载后创建 VUE 组件
- javascript - 如何样式化谷歌地图信息窗口