java - RabbitMQ 在发送消息到交换时返回 MQ_UNBOUND_QUEUE
问题描述
现在我正在使用此代码将消息发送到 RabbitMQ 交换:
public void resendFanoutMessage(String exchange, T messageContent) {
Object transNo = ReflectorUtil.getFieldValue(messageContent, "transNo");
CorrelationData correlationData = new CorrelationData(String.valueOf(transNo));
MessageProperties properties = new MessageProperties();
properties.setCorrelationId(String.valueOf(transNo));
ObjectMapper jsonReader = new ObjectMapper();
try {
Message message = new Message(jsonReader.writeValueAsBytes(messageContent), properties);
rabbitMqTemplate.convertAndSend(exchange,
"",
message,
correlationData);
} catch (JsonProcessingException e) {
log.error("send message failed", e);
}
}
但它MQ_UNBOUND_QUEUE
在回调函数中返回:
@Bean(name = "reportRabbitTemplate")
public RabbitTemplate firstRabbitTemplate(
@Qualifier("reportConnectionFactory") ConnectionFactory connectionFactory
) {
RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
firstRabbitTemplate.setMandatory(true);
firstRabbitTemplate.setReplyTimeout(2000);
firstRabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String messageId = correlationData.getId();
if (ack) {
WalletMqMessageExample example = new WalletMqMessageExample();
WalletMqMessageExample.Criteria criteria = example.createCriteria();
criteria.andMessageIdEqualTo(Long.valueOf(messageId));
List<WalletMqMessage> resultList = customWalletMqMessageMapper.selectByExample(example);
if (CollectionUtils.isEmpty(resultList)) {
log.error("ID:" + messageId);
}
if (ReturnCallbackStatus.MQ_UNBOUND_QUEUE.getKey().equals(resultList.get(0).getReturnCallback())) {
// run into this
log.error("exchange unbound,ID:" + messageId);
customWalletMqMessageMapper.updateMessageStatus(correlationData.getId(), MqMessageSendStatus.SEND_FAILED.getKey());
return;
}
log.info("success,ID:" + messageId);
customWalletMqMessageMapper.updateMessageStatus(correlationData.getId(), MqMessageSendStatus.SEND_SUCCESS.getKey());
} else {
log.error("exchange deleted,ID:" + messageId);
customWalletMqMessageMapper.updateMessageStatus(correlationData.getId(), MqMessageSendStatus.SEND_FAILED.getKey());
}
});
我确定 RabbitMQ 交换存在,并且交换与队列绑定,为什么仍然会出现此错误?顺便说一句,新消息生成可以成功发送。交换和队列绑定:
解决方案
推荐阅读
- python - 从顶部的路径数创建顶部对,创建连续图,只有一个选项
- python - 如何比较 URI 编码的字符串?
- javascript - 如何仅禁用 chrome 中的“调试器”关键字,该关键字由循环内的 eval 执行?
- python-3.x - 在 python3 中使用 GraphQL Payload 发布请求
- c - 递归穿越迷宫并在所有空间中插入节点
- wordpress - 我无法提交上传的联系表格 7
- xmodem - 在python中将文件发送到串口
- android - 按钮上带有箭头的警报对话框
- javascript - 如何仅以角度显示来自firebase的搜索结果?
- php - Laravel update() 函数不更新任何字段