node.js - RabbitMQ 心跳超时问题
问题描述
我目前正在使用 RabbitMQ 作为消息代理。最近,我在错误日志中看到许多错误HeartBeat Timeout。
同样在 RabbitMQ 日志中,我看到了这个日志:
我不知道为什么来自不同端口范围的连接太多。我使用默认设置,没有任何进一步的配置。这是我用于发布和使用的代码:
import { connect } from 'amqplib/callback_api';
import hanlder from '../calculator/middleware';
import { logger } from '../config/logger';
async function consumeRabbitMQServer(serverURL, exchange, queue) {
connect('amqp://localhost', async (error0, connection) => {
if (error0) throw error0;
const channel = connection.createChannel((error1) => {
if (error1) throw error1;
});
channel.assertExchange(exchange, 'direct', {
durable: true
});
channel.assertQueue(
queue,
{
durable: true
},
(error2) => {
if (error2) throw error2;
logger.info(`Connect to ${serverURL} using queue ${queue}`);
}
);
channel.prefetch(1);
channel.bindQueue(queue, exchange, 'info');
channel.noAck = true;
channel.consume(queue, (msg) => {
hanlder(JSON.parse(msg.content.toString()))
.then(() => {
channel.ack(msg);
})
.catch((err) => {
channel.reject(msg);
});
});
});
}
export default consumeRabbitMQServer;
用于发布消息的代码:
import createConnection from './connection';
import { logger } from '../config/logger';
async function publishToRabbitMQServer(serverURL, exchange, queue) {
const connection = createConnection(serverURL);
const c = await connection.then(async (conn) => {
const channel = await conn.createChannel((error1) => {
if (error1) throw error1;
});
channel.assertExchange(exchange, 'direct', {
durable: true
});
channel.assertQueue(
queue,
{
durable: true
},
(error2) => {
if (error2) throw error2;
logger.info(`Publish to ${serverURL} using queue ${queue}`);
}
);
channel.bindQueue(queue, exchange, 'info');
return channel;
});
return c;
}
export default publishToRabbitMQServer;
每当我启动我的服务器时,我都会运行这段代码来创建一个客户端消费到 RabbitMQ:
const { RABBITMQ_SERVER } = process.env;
consumeRabbitMQServer(RABBITMQ_SERVER, 'abc', 'abc');
当需要将消息发布到 RabbitMQ 时,会使用这段代码
const payloads = call.request.payloads;
const { RABBITMQ_SERVER } = process.env;
const channel = await publishToRabbitMQServer(RABBITMQ_SERVER, 'abc', 'abc');
for (let i = 0; i < payloads.length; i++) {
channel.publish('abc', 'info', Buffer.from(JSON.stringify(payloads[i])));
}
我正在重用 RabbitMQ 文档中的代码,似乎每当有太多用户发布消息时就会出现这个问题。感谢您的帮助。更新:我认为根本原因是当我需要发布消息时,我创建了一个新连接。我正在努力改进它,任何帮助都将不胜感激。非常感谢。
解决方案
推荐阅读
- c# - 通过 Ajax 调用将经过身份验证的 WIF 会话从 Web App A 传递到 Web App B
- web-crawler - FastUrlFilter 不适用于多个域
- c# - MVC 错误:尽管输入了有效信息,但仍返回“错误”视图而不是“成功”视图
- c++ - 如何使用 const char* 和/或字符串重载 operator=?
- directed-acyclic-graphs - 在图论中,一个强连通分量SCC是否构成一个DAG?
- python - 用python列表中的元素替换字典的值
- c++ - C++ 刽子手项目
- sml - 如何从文件中正确导入模块?
- rust - 如何将非常大的十进制字符串转换为十六进制?
- django - 将多个 uwsgi 记录器记录到标准输出