首页 > 解决方案 > ActiveMQ 批量消费者

问题描述

我需要使用来自 ActiveMQ 主题的消息并将它们保存在 mongo 中。我想知道是否有一种方法/配置可以从主题中批量消费消息,而不是一个接一个地读取消息并为每条消息进行数据库调用。

我想象最终解决方案将执行以下操作:

  1. 以 100 的批处理大小消耗消息
  2. 使用 mongo bulk insert 将批次保存到 DB
  3. 向代理发送 ACK 以获取成功插入的消息,向代理发送 NAK 以获取失败的消息。

标签: javajmsactivemqspring-boot-actuator

解决方案


JMS API 仅允许您一次接收一条消息,无论是通过JMS 1.1 还是 JMS 2 中的异步javax.jms.MessageListener或同步调用。但是,您可以使用事务处理会话批量接收多条消息。以下是JavaDoc关于事务会话的说明:javax.jms.MessageConsumer#receive()javax.jms.JMSConsumer.receive()javax.jms.Session

会话可以被指定为已交易。每个事务会话都支持单个事务系列。每个事务将一组消息发送和一组消息接收组合成一个原子工作单元。实际上,事务将会话的输入消息流和输出消息流组织成一系列原子单元。当一个事务提交时,它的原子输入单元被确认并且它的相关原子输出单元被发送。如果事务回滚完成,事务发送的消息将被销毁,会话的输入会自动恢复。

因此,您可以使用事务处理会话单独接收 100 条消息,将该数据插入 Mongo,提交事务处理会话,或者如果出现故障,您可以回滚事务处理会话(这基本上充当否定确认)。例如:

final int TX_SIZE = 100;
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("myTopic");
MessageConsumer consumer = session.createConsumer(topic);
connection.start();
while (true) {
   List messages = new ArrayList<Message>();
   for (int i = 0; i < TX_SIZE; i++) {
      Message message = consumer.receive(1000);
      if (message != null) {
         messages.add(message);
      } else {
         break; // no more messages available for this batch
      }
   }

   if (messages.size() > 0) {
      try {
         // bulk insert data from messages List into Mongo
         session.commit();
      } catch (Exception e) {
         e.printStackTrace();
         session.rollback();
      }
   } else {
      break; // no more messages in the subscription
   }
}

值得注意的是,如果您只使用 JMS 事务会话而不是完整的 XA 事务,那么在 Mongo 中至少会有一些重复的风险(例如,如果您的应用程序在成功将数据插入 Mongo 之后但在提交事务会话之前崩溃)。XA 事务将为您减轻这种风险,但代价是相当多的额外复杂性,具体取决于您的环境。

最后,如果您遇到 ActiveMQ “Classic” 的性能限制,请考虑使用ActiveMQ Artemis,它是 ActiveMQ 的下一代消息代理。


推荐阅读