spring-boot - 使用 Spring Boot、JMS 和 ActiveMq 设置消息优先级
问题描述
我正在尝试使用 Spring Boot 和 ActiveMQ 使用 JmsTemplate 发送具有不同优先级的消息,但它不起作用。
我试过这个:
MessageCreator mc = session -> {
TextMessage tm = session.createTextMessage("hello");
tm.setJMSPriority(6);
return tm;
};
jmsTemplate.send((Queue) () -> "box", mc);
ActiveMQ 代理内部的优先级仍然是 4(默认值)。
我发现实际更改消息优先级的唯一方法是更改 JmsTemplate 级别的优先级。
jmsTemplate.setPriority(3);
现在的问题是,在此之后发送的所有消息都将具有优先级 3。我知道我可以在每次发送后重置 JmsTemplate 优先级,但它不是“干净”的,并发呢?
如何为每条消息设置优先级并使用@JmsListener 获取具有最高优先级的消息?
解决方案
我刚刚遇到了同样的问题。
我测试了你关于设置 jmsTemplate 优先级的观点,你的假设是正确的。并发处理不当。
我发现可行的解决方案(尽管不理想)是扩展 JmsTemplate 并覆盖 doSend 方法以将 JmsPriority 从消息复制到生产者。这并不理想,在 spring boot 版本上扩展类 make break(我已经在 2.1.7 上测试过),并且有一些额外的步骤来注册新的 JmsTemplate。但它确实有效,我已经在负载下对其进行了测试。
脚步.....
创建一个扩展 JmsTemplate 的新类,覆盖 doSend 方法以从消息中复制优先级
import java.io.Serializable;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import org.springframework.jms.core.JmsTemplate;
public class RcsJmsTemplate extends JmsTemplate implements Serializable {
public RcsJmsTemplate() {
}
public RcsJmsTemplate(ConnectionFactory connectionFactory) {
super(connectionFactory);
}
/**
* Actually send the given JMS message.
*
* AF: EXTENDED TO COPY THE PRIORITY FROM THE MESSAGE TO THE PRODUCER
*
* @param producer the JMS MessageProducer to send with
* @param message the JMS Message to send
* @throws JMSException if thrown by JMS API methods
*/
@Override
protected void doSend(MessageProducer producer, Message message) throws JMSException {
if (getDeliveryDelay() >= 0) {
producer.setDeliveryDelay(getDeliveryDelay());
}
producer.send(message, getDeliveryMode(), message.getJMSPriority(), getTimeToLive());
}
}
添加一个bean(到您的App.java,或适当的配置类)您可能不需要通过消息转换器(我在我的项目中使用Jackson)可能还需要将其他配置应用于新的JmsTemplate。
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RcsJmsTemplate rcsJmsTemplate = new RcsJmsTemplate(connectionFactory);
rcsJmsTemplate.setMessageConverter(messageConverter);
return rcsJmsTemplate;
}
然后在您的问题中设置消息的 JmsPriority 属性。您使用 MessageCreator 但在我的项目中我使用消息后处理
public void convertAndSendWithPriority(JmsTemplate jmsTemplate, String destination, Object message, int priority) {
jmsTemplate.convertAndSend(destination, message, (Message jmsMessage) -> {
jmsMessage.setJMSPriority(priority);
return jmsMessage;
});
}
为了完整起见,您应该添加属性: spring.jms.template.qos-enabled=true
而已。希望它有所帮助(实际上我希望有人提出更好的答案)谢谢