java - 如何从 activemq 中删除已使用的消息?
问题描述
我需要在使用它后不久从 activemq 中删除消息。例如,我向队列发送一条消息,然后使用它,我需要将它从队列中删除。我在这里使用了一个消息存储和一个 clear() 方法。消息被添加到队列中并仅从消息存储中删除,而不是从队列中删除。我需要一种从队列中删除消息的方法。感谢你的帮助。!
我试过下面的代码。
制片人
@Component
public class JmsProducer {
@Autowired
JmsTemplate jmsTemplate;
@Value("${gkz.activemq.queue}")
String queue;
public void send(Customer customer){
jmsTemplate.convertAndSend(queue, customer);
}
}
消费者
@Component
public class JmsConsumer {
@Autowired
private MessageStorage customerStorage;
@JmsListener(destination = "${gkz.activemq.queue}",containerFactory="jsaFactory")
public void receive(Customer customer){
System.out.println("Recieved Message: " + customer);
customerStorage.add(customer);
}
}
控制器
@PostMapping(value="/api/customer")
public Customer postCustomer(@RequestBody Customer customer){
jmsProducer.send(customer);
return customer;
}
@GetMapping(value="/api/customers")
public List<Customer> getAll(){
List<Customer> customers = customerStorage.getAll();
return customers;
}
@DeleteMapping(value="/api/customers/clear")
public String clearCustomerStorage() {
customerStorage.clear();
return "Clear All CustomerStorage!";
}
消息存储
public class MessageStorage {
private List<Customer> customers = new ArrayList<>();
public void add(Customer customer) {
customers.add(customer);
}
public void clear() {
customers.clear();
}
public List<Customer> getAll(){
return customers;
}
}
连接工厂配置
@Configuration
public class ConnectionFactoryConfiguration {
@Value("${gkz.activemq.broker.url}")
String brokerUrl;
@Value("${gkz.activemq.borker.username}")
String userName;
@Value("${gkz.activemq.borker.password}")
String password;
@Bean
public ConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerUrl);
connectionFactory.setUserName(userName);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
//Used for Receiving Message
@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setMessageConverter(jacksonJmsMessageConverter());
configurer.configure(factory, connectionFactory);
return factory;
}
//Used for Sending Messages.
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate template = new JmsTemplate();
template.setMessageConverter(jacksonJmsMessageConverter());
template.setConnectionFactory(connectionFactory());
return template;
}
}
解决方案
单击 MQ 上的清除选项,稍后重新启动 MQ 服务。这将清除所有未处于待处理状态的消息。
推荐阅读
- python - Python:如何在不破坏文件结构和样式的情况下修改 XLSX 中的特定单元格?
- kubernetes - 当一个 pod 资源限制没有超过,而单个容器的资源限制超过了,会发生什么?
- mysql - pymysql.err.InterfaceError: (0, '') 在多线程 python 中执行多个查询时出错
- json - 过滤:包含 Json 对象数组的 Json
- python - python中的时间戳格式
- python - Slack API:rtm.start 有效,但 rtm.connect 在 slack_sdk.rtm_v2 中无效
- python - 如何简化“二十一点”函数以提高效率?简单的语法/函数效率查询
- bash - 我想在 bash 中将变量分配给用逗号分隔的输入
- java - 如何将缓存中存储的数据用于springboot中的另一个功能
- javascript - 将媒体轨道添加到连接后,WebRTC 数据通道连接失败