首页 > 解决方案 > RabbitMQ 心跳超时问题

问题描述

我目前正在使用 RabbitMQ 作为消息代理。最近,我在错误日志中看到许多错误HeartBeat Timeout

同样在 RabbitMQ 日志中,我看到了这个日志:

在此处输入图像描述

我不知道为什么来自不同端口范围的连接太多。我使用默认设置,没有任何进一步的配置。这是我用于发布和使用的代码:

import { connect } from 'amqplib/callback_api';
import hanlder from '../calculator/middleware';
import { logger } from '../config/logger';

async function consumeRabbitMQServer(serverURL, exchange, queue) {
  connect('amqp://localhost', async (error0, connection) => {
    if (error0) throw error0;

    const channel = connection.createChannel((error1) => {
      if (error1) throw error1;
    });

    channel.assertExchange(exchange, 'direct', {
      durable: true
    });

    channel.assertQueue(
      queue,
      {
        durable: true
      },
      (error2) => {
        if (error2) throw error2;
        logger.info(`Connect to ${serverURL} using queue ${queue}`);
      }
    );

    channel.prefetch(1);

    channel.bindQueue(queue, exchange, 'info');
    channel.noAck = true;

    channel.consume(queue, (msg) => {
      hanlder(JSON.parse(msg.content.toString()))
        .then(() => {
          channel.ack(msg);
        })
        .catch((err) => {
          channel.reject(msg);
        });
    });
  });
}

export default consumeRabbitMQServer;

用于发布消息的代码:

import createConnection from './connection';
import { logger } from '../config/logger';

async function publishToRabbitMQServer(serverURL, exchange, queue) {
  const connection = createConnection(serverURL);
  const c = await connection.then(async (conn) => {
    const channel = await conn.createChannel((error1) => {
      if (error1) throw error1;
    });

    channel.assertExchange(exchange, 'direct', {
      durable: true
    });

    channel.assertQueue(
      queue,
      {
        durable: true
      },
      (error2) => {
        if (error2) throw error2;
        logger.info(`Publish to ${serverURL} using queue ${queue}`);
      }
    );

    channel.bindQueue(queue, exchange, 'info');

    return channel;
  });
  return c;
}

export default publishToRabbitMQServer;

每当我启动我的服务器时,我都会运行这段代码来创建一个客户端消费到 RabbitMQ:

const { RABBITMQ_SERVER } = process.env;
consumeRabbitMQServer(RABBITMQ_SERVER, 'abc', 'abc');

当需要将消息发布到 RabbitMQ 时,会使用这段代码

  const payloads = call.request.payloads;
  const { RABBITMQ_SERVER } = process.env;
  const channel = await publishToRabbitMQServer(RABBITMQ_SERVER, 'abc', 'abc');

  for (let i = 0; i < payloads.length; i++) {
    channel.publish('abc', 'info', Buffer.from(JSON.stringify(payloads[i])));
  }

我正在重用 RabbitMQ 文档中的代码,似乎每当有太多用户发布消息时就会出现这个问题。感谢您的帮助。更新:我认为根本原因是当我需要发布消息时,我创建了一个新连接。我正在努力改进它,任何帮助都将不胜感激。非常感谢。

标签: node.jsrabbitmq

解决方案


推荐阅读