java - 将 ActiveMQ 与多个消费者实例一起使用
问题描述
在我们的架构中,我们为每个应用程序提供了 2 个或更多容器,以实现高可用性。我们正在使用 activeMQ,我想实现以下行为。
- 将消息推送到队列。
- 此消息将仅由 *one *container(第一个读取它的容器)使用。
- 一旦消息处理成功,消费者将更新此消息,可以确认和忽略
- 我尝试使用带有提交的事务并使用 CLIENT_ACKNOWLEDGE,但是在这两种情况下,两个消费者都收到了消息并对其进行了处理。
我只希望一个消费者根据可用性处理每条消息。我们的实现是用 Java 实现的。
请分享实现方法。
这是我的代码示例
final Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
// Create a session.
final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final Destination consumerDestination = consumerSession.createQueue(queueName);
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
// Begin to wait for messages.
Queue queue = consumerSession.createQueue(queueName);
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
//do your things here
ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
if (message == null)
continue;
//handle message
System.out.println("Message received in : " + message);
try {
String text = message.getText();
JSONObject messageJson = new JSONObject(text);
consumer.receive(1000);
String responseString = handleMessage(messageJson);
message.acknowledge();
谢谢莫舍
解决方案
这里的问题是您正在确认来自QueueBrowser
实例的消息,这不会产生任何影响,因为浏览器仅用于浏览消息而不是使用它们。
...
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
...
ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
...
try {
...
message.acknowledge(); // this does nothing
您实际上是在创建一个真正的消费和调用consumer.receive(1000)
,但您正在丢弃返回的Message
实例receive()
。这是Message
您必须确认才能实际使用队列中的消息的实例。
...
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
...
while (msgs.hasMoreElements()) {
...
try {
...
Message actualMessage = consumer.receive(1000); // acknowledge this!
...
另一个重要的注意事项...队列浏览器收到的消息不能保证是队列的静态快照。因此,假设您的调用consumer.receive(1000)
实际上是来自队列浏览器的相同消息是危险的。在我看来,您应该重新设计此逻辑以仅使用 aMessageConsumer
并删除QueueBrowser
.
推荐阅读
- xslt - 使用 xslt 在现有 xml 节点下追加新的 xml 节点
- xpages - XPage 无法正确呈现
- java - Tomcat中奇怪的JSON字符串失真
- html - CSS不能居中菜单和子菜单2nt行中的空白
- vue.js - 将 props 值传递给组件
- xml - 如何使用 xslt 1.0 在 xml 元素中划分字符串
- android - Android ChipGroup:芯片中的多行文本
- c# - 停止刷新 mvc 布局中的 _navigation 部分视图或仅加载 _navigation 1 次
- apache-flink - Flink:如何在数据库中保存行列表
- android - 在 NodeJS 服务器中解决 csrf 和身份验证问题的最佳方法