首页 > 解决方案 > 添加到 Activemq 队列时如何识别重复消息

问题描述

我在 Spring Boot 应用程序中使用 ActiveMq 设置 JMS。但无法理解如何防止发送者(消息提供者)将重复的消息添加到队列中。

我的应用程序初始 JMS 和消息转换器 bean 配置如下

@Bean
    public Queue queue() {
        return new ActiveMQQueue("pendingDocuments.queue");
    }


@Bean 
 public MessageConverter jacksonJmsMessageConverter() {
       MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
            converter.setTargetType(MessageType.TEXT);
            converter.setTypeIdPropertyName("_type");
            return converter;
        }

发送方方法实现

@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

@Autowired
private Queue pendingDocumentsQueue;

public void getPendingDocuments(){

/*My Custom Java Object, Actually these objects will read from DB for every 5 min, so process no way to know either these are already added to queue or not*/

Document document= new Document();
document.setUniqueId("122212");
document.setContent("TEST CONTENT");
this.jmsMessagingTemplate.convertAndSend(this.pendingDocumentsQueue, document);

        }

我想知道如何根据文档唯一 ID 将消息(我的文档对象)添加到队列中。

标签: javaspringspring-bootjmsactivemq

解决方案


如果我正确理解这一点,您是否试图阻止您的通信处理重复消息,即使在崩溃的情况下也是如此?

通常,您不应该有重复项(ActiveMQ 不会“自己”创建那些),但是如果您考虑可能的故障点(发送方崩溃、代理崩溃、消费者崩溃),您会看到发送方和除非消费者告诉他们实际处理的最后一条消息,否则代理无法做出“重复”的决定。

这可能通过从消费者到代理的内部 ACK(防止丢失消息但不重复)或使用也可以防止重复的 JMS 事务来发生。如果您让您的发送者和消费者使用交易会话jmsTemplate.setSessionTransacted(true); (请参阅此处),您将使用如下协议进行重复的免费通信:

  1. 发件人从持久存储(可能是数据库)检查下一个 UniqueID 是什么。
  2. 发件人发送消息。
  3. 发送方提交事务,同时记下处理此 UniqueID。
  4. Broker 将使用唯一的事务 ID 传递此消息。
  5. 消费者收到消息。
  6. 消费者处理此 UniqueID 并同时提交事务。

推荐阅读