java - spring-boot activemq 记录过期消息
问题描述
我有一个 spring-boot 简单应用程序,我想记录过期消息;在读了很多票,读了很多教程之后,它没有用。
这是我的 SpringBootApplication
@SpringBootApplication
public class Run implements ApplicationRunner {
private static Logger log = LoggerFactory.getLogger(Run.class);
@Autowired
private OrderSender orderSender;
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
log.info("Spring Boot Embedded ActiveMQ Configuration Example");
for (int i = 0; i < 5; i++) {
Order myMessage = new Order(i + " - Sending JMS Message using Embedded activeMQ", new Date());
orderSender.send(myMessage);
TimeUnit.MILLISECONDS.sleep(500);
}
log.info("Waiting for all ActiveMQ JMS Messages to be consumed");
int ttl = 10;
TimeUnit.SECONDS.sleep(ttl);
log.info(ttl + " seconds elapsed," + ttl + " more seconds to see DLQ log");
TimeUnit.SECONDS.sleep(ttl);
System.exit(-1);
}
public static void main(String[] args) throws Exception {
SpringApplication.run(Run.class, args);
}
}
这是我的 MessageSender
@Service
public class OrderSender {
//
private static Logger log = LoggerFactory.getLogger(OrderSender.class);
@Autowired
private JmsTemplate jmsTemplate;
public void send(Order myMessage) {
log.info("sending with convertAndSend() to queue <" + myMessage + ">");
jmsTemplate.convertAndSend(QUEUE, myMessage);
}
}
这是我的队列消费者 [我试图将它们分开写但没有成功]
@Component
public class OrderConsumer {
private static Logger log = LoggerFactory.getLogger(OrderConsumer.class);
@JmsListener(destination = QUEUE)
public void receiveMessage(Order order) throws InterruptedException {
String logPrefix = UUID.randomUUID().toString() + " - ";
log.info(logPrefix + "received <" + order + ">");
log.debug(logPrefix + "- - - - - - - - SLEEP - - - - - - - - - - - - -");
TimeUnit.SECONDS.sleep(8);
log.debug(logPrefix + "- - - - - - - - END SLEEP - - - - - - - - - - -");
}
@JmsListener(destination = QUEUE_DLQ)
public void receiveMessageDlq(Order order) throws InterruptedException {
String logPrefix = UUID.randomUUID().toString() + " - ";
log.info(logPrefix + "DLQ <" + order + ">");
}
}
这是我的配置 bean
@EnableJms
@Configuration
public class ActiveMQConfig {
//
public static final String QUEUE = "orderqueue";
public static final String SUFFIX = ".dlq";
public static final String QUEUE_DLQ = QUEUE + SUFFIX;
@Bean
public BrokerService broker(@Autowired DeadLetterStrategy strategy) throws Exception {
BrokerService broker = new BrokerService();
broker.addConnector("vm://embedded?broker.persistent=false,useShutdownHook=false");
//
PolicyEntry entry = new PolicyEntry();
entry.setDestination(new ActiveMQQueue("*"));
// entry.setDestination(new ActiveMQQueue(">"));
entry.setDeadLetterStrategy(strategy);
//
PolicyMap map = new PolicyMap();
map.setPolicyEntries(Arrays.asList(entry));
broker.setDestinationPolicy(map);
//
return broker;
}
@Bean
public DeadLetterStrategy deadLetterStrategy() {
IndividualDeadLetterStrategy ids = new IndividualDeadLetterStrategy(); //
ids.setQueueSuffix(SUFFIX);
ids.setUseQueueForQueueMessages(true);
return ids;
}
@Bean
public JmsListenerContainerFactory<?> queueListenerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setMessageConverter(messageConverter());
return factory;
}
@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public Queue myQueue() {
ActiveMQQueue queue = new ActiveMQQueue(QUEUE);
return queue;
}
}
这是在我的 application.properties
spring.jms.listener.max-concurrency=1
spring.jms.template.time-to-live=5000
这是在我的 pom.xml [部分]
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
这就是我的日志
Starting Run on ...
No active profile set, falling back to default profiles: default
Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@4516af24: startup date [Thu Mar 12 14:46:40 CET 2020]; root of context hierarchy
Using Persistence Adapter: KahaDBPersistenceAdapter[C:\developements\customers\m2sc\workspace\spring-embedded-activemq\activemq-data\localhost\KahaDB]
JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
KahaDB is version 6
Recovering from the journal @1:104464
Recovery replayed 4 operations from the journal in 0.02 seconds.
PListStore:[C:\developements\customers\m2sc\workspace\spring-embedded-activemq\activemq-data\localhost\tmp_storage] started
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) is starting
Connector vm://embedded?broker.persistent=false,useShutdownHook=false started
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) started
For help or more information please see: http://activemq.apache.org
Registering beans for JMX exposure on startup
Starting beans in phase 2147483647
Connector vm://localhost started
Spring Boot Embedded ActiveMQ Configuration Example
>> sending with convertAndSend() to queue <Order{content='0 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:41 CET 2020}>
acbc670e-de0c-49ac-96ef-763675920069 - received <Order{content='0 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:41 CET 2020}>
>> sending with convertAndSend() to queue <Order{content='1 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:42 CET 2020}>
>> sending with convertAndSend() to queue <Order{content='2 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:43 CET 2020}>
>> sending with convertAndSend() to queue <Order{content='3 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:43 CET 2020}>
>> sending with convertAndSend() to queue <Order{content='4 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:44 CET 2020}>
Waiting for all ActiveMQ JMS Messages to be consumed
10 seconds elapsed,10 more seconds to see DLQ log
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) is shutting down
Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@4516af24: startup date [Thu Mar 12 14:46:40 CET 2020]; root of context hierarchy
Connector vm://embedded?broker.persistent=false,useShutdownHook=false stopped
Stopping beans in phase 2147483647
Connector vm://localhost stopped
Setup of JMS message listener invoker failed for destination 'orderqueue.dlq' - trying to recover. Cause: peer (vm://localhost#1) stopped.
PListStore:[C:\developements\customers\m2sc\workspace\spring-embedded-activemq\activemq-data\localhost\tmp_storage] stopped
Stopping async queue tasks
Stopping async topic tasks
Setup of JMS message listener invoker failed for destination 'orderqueue' - trying to recover. Cause: peer (vm://localhost#3) stopped.
Stopped KahaDB
Unregistering JMX-exposed beans on shutdown
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) uptime 23.525 seconds
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) is shutdown
如您所见,我发送了 5 条消息,仅处理了 1 条,DLQ 中没有人处理……我在 Stack 上花了很多时间,但没有找到解决方案。请有人帮助我!
解决方案
使用这个特定版本的spring-boot
(1.5.8) 和active-mq
,它不会为每个自定义队列实例化 DLQ 队列,但检查 DEBUG 级别日志我发现了一个名为ActiveMQ.DLQ
created 的通用 DLQ 队列来处理这个问题,只是创建了一个带有注释的方法:
org.springframework.jms.annotation.JmsListener(destination = "ActiveMQ.DLQ")
推荐阅读
- sql - 如果表单上没有更改,则停止 Doctrine 执行更新查询
- angular-material - Angular Material 6 覆盖输入/选择样式
- python - keras 自定义损失纯 python(没有 keras 后端)
- c# - 了解 Kademlia find_node 并将节点添加到路由表
- javascript - 我可以在 Object.defineProperty(Object.prototype) 挂钩上缓存值吗?
- vba - 在 Word 中用尾注文本替换参考
- checkbox - Vue js 复选框自定义组件
- scala - spark scala 每个数据集输出为单行数据框
- r - R:ggplot2 使用 facet_wrap 设置中间的最后一个图
- python - 从具有子父关系的列表开始,创建以子代为列表的父代