首页 > 解决方案 > Rabbitmq nodejs 关闭连接

问题描述

我对rabbitmq有疑问。

我想知道在发送“pub”消息时是否需要关闭与通道的连接和amqpblib连接,与消费者一样吗?或者正确的做法是保持连接打开?

我有这个要发布和订阅:

public produce = async <T>(queue: string, message: T): Promise<boolean> => {
    try {
      if (!this.connection) await this.start();
      await this.initChannel(queue);
      const sendResult = this.channel.sendToQueue(queue, Buffer.from(message), {
        persistent: true,
      });
      if (!sendResult) {
        await new Promise(resolve => this.channel.once('drain', () => resolve));
      }
      return sendResult;
    } catch (error) {
      Logger.info(error.message);
      return false;
    } finally {
      this.close();
    }
  };

订阅

public subscribe = async (
    queue: string,
    onMessage: (msg: IMessage) => boolean,
  ): Promise<void> => {
    if (!this.connection) await this.start();
    const channel = await this.initChannel(queue);
    channel.consume(queue, message => {
      if (!message) return false;
      const body = <IMessage>JSON.parse(message.content.toString());
      if (body && onMessage(body)) onMessage(body);
      channel.ack(message);
    });
  };

这将初始化连接和侦听器事件:

private start = async () => {
    try {
      this.connection = await connect(this.rabbitUrl);
      Logger.info('connect to RabbitMQ success');
      await this.listeners();
    } catch (err) {
      Logger.info(err.message);
      sleep(this.start, 10000);
    }
  };

private listeners = async (): Promise<Connection> => {
    return (
      this.connection.on('error', (err: Error) => {
        Logger.info(err.message);
        sleep(this.start, 10000);
      }) &&
      this.connection.on('close', () => {
        Logger.info('connection to RabbitQM closed!');
        sleep(this.start, 10000);
      })
    );
  };

标签: node.jsrabbitmqnode-amqp

解决方案


创建连接是昂贵的,通道是轻量级的,但在大多数客户端中是线程不安全的。

对于 Node.js 应用程序(单线程模型),理想情况下只有两个连接publishsubscribe,每个连接一个通道。

所以请保持连接和通道打开。


推荐阅读