首页 > 解决方案 > RabbitMQ 在发送消息到交换时返回 MQ_UNBOUND_QUEUE

问题描述

现在我正在使用此代码将消息发送到 RabbitMQ 交换:

public void resendFanoutMessage(String exchange, T messageContent) {
        Object transNo = ReflectorUtil.getFieldValue(messageContent, "transNo");
        CorrelationData correlationData = new CorrelationData(String.valueOf(transNo));
        MessageProperties properties = new MessageProperties();
        properties.setCorrelationId(String.valueOf(transNo));
        ObjectMapper jsonReader = new ObjectMapper();
        try {
            Message message = new Message(jsonReader.writeValueAsBytes(messageContent), properties);
            rabbitMqTemplate.convertAndSend(exchange,
                    "",
                    message,
                    correlationData);
        } catch (JsonProcessingException e) {
            log.error("send message failed", e);
        }
    }

但它MQ_UNBOUND_QUEUE在回调函数中返回:

@Bean(name = "reportRabbitTemplate")
    public RabbitTemplate firstRabbitTemplate(
            @Qualifier("reportConnectionFactory") ConnectionFactory connectionFactory
    ) {
        RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
        firstRabbitTemplate.setMandatory(true);
        
        firstRabbitTemplate.setReplyTimeout(2000);
        firstRabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String messageId = correlationData.getId();
            
            if (ack) {
                WalletMqMessageExample example = new WalletMqMessageExample();
                WalletMqMessageExample.Criteria criteria = example.createCriteria();
                criteria.andMessageIdEqualTo(Long.valueOf(messageId));
                List<WalletMqMessage> resultList = customWalletMqMessageMapper.selectByExample(example);
                if (CollectionUtils.isEmpty(resultList)) {
                    log.error("ID:&quot; + messageId);
                }
                if (ReturnCallbackStatus.MQ_UNBOUND_QUEUE.getKey().equals(resultList.get(0).getReturnCallback())) {
                    // run into this
                    log.error("exchange unbound,ID:&quot; + messageId);
                    customWalletMqMessageMapper.updateMessageStatus(correlationData.getId(), MqMessageSendStatus.SEND_FAILED.getKey());
                    return;
                }

                
                log.info("success,ID:&quot; + messageId);
                customWalletMqMessageMapper.updateMessageStatus(correlationData.getId(), MqMessageSendStatus.SEND_SUCCESS.getKey());
            } else {
                log.error("exchange deleted,ID:&quot; + messageId);
                customWalletMqMessageMapper.updateMessageStatus(correlationData.getId(), MqMessageSendStatus.SEND_FAILED.getKey());
            }
        });

我确定 RabbitMQ 交换存在,并且交换与队列绑定,为什么仍然会出现此错误?顺便说一句,新消息生成可以成功发送。交换和队列绑定:

在此处输入图像描述

标签: javarabbitmq

解决方案


推荐阅读