java - ActiveMQ 批量消费者
问题描述
我需要使用来自 ActiveMQ 主题的消息并将它们保存在 mongo 中。我想知道是否有一种方法/配置可以从主题中批量消费消息,而不是一个接一个地读取消息并为每条消息进行数据库调用。
我想象最终解决方案将执行以下操作:
- 以 100 的批处理大小消耗消息
- 使用 mongo bulk insert 将批次保存到 DB
- 向代理发送 ACK 以获取成功插入的消息,向代理发送 NAK 以获取失败的消息。
解决方案
JMS API 仅允许您一次接收一条消息,无论是通过JMS 1.1 还是 JMS 2 中的异步javax.jms.MessageListener
或同步调用。但是,您可以使用事务处理会话批量接收多条消息。以下是JavaDoc关于事务会话的说明:javax.jms.MessageConsumer#receive()
javax.jms.JMSConsumer.receive()
javax.jms.Session
会话可以被指定为已交易。每个事务会话都支持单个事务系列。每个事务将一组消息发送和一组消息接收组合成一个原子工作单元。实际上,事务将会话的输入消息流和输出消息流组织成一系列原子单元。当一个事务提交时,它的原子输入单元被确认并且它的相关原子输出单元被发送。如果事务回滚完成,事务发送的消息将被销毁,会话的输入会自动恢复。
因此,您可以使用事务处理会话单独接收 100 条消息,将该数据插入 Mongo,提交事务处理会话,或者如果出现故障,您可以回滚事务处理会话(这基本上充当否定确认)。例如:
final int TX_SIZE = 100;
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("myTopic");
MessageConsumer consumer = session.createConsumer(topic);
connection.start();
while (true) {
List messages = new ArrayList<Message>();
for (int i = 0; i < TX_SIZE; i++) {
Message message = consumer.receive(1000);
if (message != null) {
messages.add(message);
} else {
break; // no more messages available for this batch
}
}
if (messages.size() > 0) {
try {
// bulk insert data from messages List into Mongo
session.commit();
} catch (Exception e) {
e.printStackTrace();
session.rollback();
}
} else {
break; // no more messages in the subscription
}
}
值得注意的是,如果您只使用 JMS 事务会话而不是完整的 XA 事务,那么在 Mongo 中至少会有一些重复的风险(例如,如果您的应用程序在成功将数据插入 Mongo 之后但在提交事务会话之前崩溃)。XA 事务将为您减轻这种风险,但代价是相当多的额外复杂性,具体取决于您的环境。
最后,如果您遇到 ActiveMQ “Classic” 的性能限制,请考虑使用ActiveMQ Artemis,它是 ActiveMQ 的下一代消息代理。
推荐阅读
- python - 如何将 CSV 导入到两个使用 Python 相互引用的不同 SQL 表中
- vue.js - 无法安装 vuetify
- java - 如何检查一个类是否依赖 CDI 来创建实例?
- typescript - 具有客户端实用程序定义结构的基本 TypeScript NodeJS 程序,使用严格的编码规则
- python - 无法导入 .py 文件,没有名为“文件名”的模块错误
- python - 摆脱嵌套的for循环
- android - document.data 给出无用的数据
- c - 从文件中读取文件时 fscanf 语法出现问题
- javascript - 如何将数据目标属性从选择选项应用到按钮?
- javascript - 在 Node Js 中,我如何写入从视频上传的文件块