首页 > 解决方案 > RabbitMQ DefaultConsumer 导致消费者标签过多

问题描述

我有一个监听特定队列的 RabbitMQ 客户端应用程序。客户端创建 DefaultConsumer 的实例并实现 handleDelivery 方法。这是代码

    protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

    public void receiveMessages() {
        try {
//            channel.basicQos(pollCount);
            Message message = new Message();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    String response = new String(body, "UTF-8");
                    if (response != null) {
                        message.setId(NUID.nextGlobal());
                        message.setPayload(response);
                        message.setDeliveryTag(deliveryTag);
                        messages.add(message);
                        logger.info("Message received: ", message.getPayload());
                    }
                }
            };
            logger.debug("**********Channel status: " + channel.isOpen());
            channel.basicConsume(queueName, false, consumer);
        } catch (Exception e) {
            logger.error("Exception while getting messages from Rabbit ", e);

        }
    }

方法 receiveMessages() 每 500 毫秒通过一个线程频繁调用,并将消息排入不同的 List 以供使用。由于对 receiveMessages() 的投票,我观察到消费者标签在通过兔子控制台查看时不断创建和增长,如图所示。看到那些越来越多的消费者标签是正常的吗? 在此处输入图像描述

标签: javarabbitmqamqprabbitmq-exchange

解决方案


我终于找到了一个可行的解决方案。正如 Luke Bakken 强调的那样,不需要投票。我现在只打电话receiveMesssages()一次。此后,当消息发布到队列中时,我的消费者正在接收回调。

 protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
 public void receiveMessages() {
    try {
        Message message = new Message();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            String response = new String(delivery.getBody(), "UTF-8");
            if (response != null) {
                message.setId(NUID.nextGlobal());
                message.setPayload(response);
                message.setDeliveryTag(deliveryTag);
                messages.add(message);
                logger.info("Message received: ", message.getPayload());
            };
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
    } catch (Exception e) {
        logger.error("Exception while getting messages from Rabbit ", e);
    }
}

rabbit 控制台现在在绑定队列下仅显示 1 个消费标签条目。


推荐阅读