spring-boot - 为什么我不能使用 spring-boot-starter-amqp 在消费者端获取correlationId?
问题描述
我将 RabbitTemplate 发送消息与我自己生成的 CorrelationData 一起使用。我在 cofirmCallBack 中收到了相关性 ID,但我无法在消费者端收到它。
我用 2.0.3.RELEASE 和 2.1.0.RELEASE 测试了这个问题,结果和上面描述的一致。
rabbitmq 配置
@Configuration
public class RabbitMQConfig {
@Value("${mq.rabbit.addresses}")
private String addresses;
@Value("${mq.rabbit.username}")
private String username;
@Value("${mq.rabbit.password}")
private String password;
@Value("${mq.rabbit.virtualHost}")
private String virtualHost;
@Value("${mq.rabbit.sessionCacheSize}")
private int sessionCacheSize;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);// addresses list of addresses with form "host[:port],..."
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
connectionFactory.setChannelCacheSize(sessionCacheSize);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(MessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(messageConverter);
template.setMandatory(true);
template.setConfirmCallback(new ConfirmCallbackListener());
template.setReturnCallback(new ReturnCallBackListener());
return template;
}
@Bean
public MessageConverter messageConverter(ObjectMapper customMapper) {
return new Jackson2JsonMessageConverter(customMapper);
}
@Bean
public Queue testQueue() {
return new Queue("test-queue", true);
}
@Bean
public TopicExchange defaultExchange() {
return new TopicExchange("test-exchange", true, false);
}
@Bean
public Binding bindingExchangeCommon(Queue testQueue, TopicExchange defaultExchange) {
return BindingBuilder.bind(testQueue).to(defaultExchange).with("test");
}
@Bean
public SimpleMessageListenerContainer testMessageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("test-queue");
container.setExposeListenerChannel(true);
container.setPrefetchCount(250);
container.setMaxConcurrentConsumers(20);
container.setConcurrentConsumers(10);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new TestMessageListener());
return container;
}
}
确认回调
public class ConfirmCallbackListener implements RabbitTemplate.ConfirmCallback {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(!ack) {
logger.info("send message ack failed: " + cause + " -> ID: " + String.valueOf(correlationData));
}else {
logger.info("send message ack success -> ID: " + String.valueOf(correlationData));
}
}
}
返回回调
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback{
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("send message failed...");
}
}
消息监听器
public class TestMessageListener implements ChannelAwareMessageListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
// Here: get CorrelationId is always null
logger.info("handle message: {} -> ID: {}" , new String(message.getBody(), "UTF-8"),
message.getMessageProperties().getCorrelationId());
if(true) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
logger.info("listener ack message completed");
}else {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
} catch (Exception e) {
logger.error("handle test message error", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
发送消息
@RestController
@RequestMapping("/rabbitmq")
public class RabbitmqCtrl {
private AtomicLong atoId = new AtomicLong();
@Resource
private RabbitTemplate rabbitTemplate;
@PostMapping("sendMsg")
public String sendMsg(@RequestBody String content) {
Message message = new Message();
message.setId(String.valueOf(atoId.incrementAndGet()));
message.setContent(content);
rabbitTemplate.convertAndSend("test-exchange", "test", message, new CorrelationData(String.valueOf(atoId.get())));
return "success";
}
}
我尝试将 CorrelationDataPostProcessor 设置为 RabbitTemplate,如下所示:
template.setCorrelationDataPostProcessor(new CorrelationDataPostProcessor() {
@Override
public CorrelationData postProcess(Message message, CorrelationData correlationData) {
if(correlationData != null) {
message.getMessageProperties().setCorrelationId(correlationData.getId());
}
return correlationData;
}
});
这样我就可以拿到CorelationID了,但是我想既然发送消息的时候已经设置了ID,我应该不用做。还有其他更合理的解释吗?
解决方案
CorrelationData
除非您使用自定义明确说明,否则不会通过网络发送,MessagePostProcessor
就像您对CorrelationDataPostProcessor
. 默认实现是这样的:
default Message postProcessMessage(Message message, Correlation correlation) {
return postProcessMessage(message);
}
如您所见, acorrelation
被完全忽略。
因此,要将相关性发送到消费者端,我们确实必须提供一个自定义MessagePostProcessor
并将其注入到RabbitTemplate
.
推荐阅读
- python - 将最大值时间戳放入 PySpark 中的数组中
- postgresql - 使用 postgresql 作为 .net core 3.0 中的数据库时,代码中的数据迁移首先出错
- go - 在go中将字符串转换为字节
- python - 在 Altair 中保留上一层的工具提示
- c# - 为什么 ConfigurationManager 不包含 OpenExeConfiguration 的定义?C#
- flutter - setState() 未显示在建议的小部件颤动中
- javascript - 来自不同来源的 Cookie
- javascript - Winforms (particles.js) 中的多边形粒子
- typescript - 带有严格空检查的 RegExpMatchArray 的打字稿类型定义
- tfs - TFS API 待处理项目、排除列表、包含列表