首页 > 解决方案 > 将 ActiveMQ 与多个消费者实例一起使用

问题描述

在我们的架构中,我们为每个应用程序提供了 2 个或更多容器,以实现高可用性。我们正在使用 activeMQ,我想实现以下行为。

  1. 将消息推送到队列。
  2. 此消息将仅由 *one *container(第一个读取它的容器)使用。
  3. 一旦消息处理成功,消费者将更新此消息,可以确认和忽略
  4. 我尝试使用带有提交的事务并使用 CLIENT_ACKNOWLEDGE,但是在这两种情况下,两个消费者都收到了消息并对其进行了处理。

我只希望一个消费者根据可用性处理每条消息。我们的实现是用 Java 实现的。

请分享实现方法。

这是我的代码示例

 final Connection consumerConnection = connectionFactory.createConnection();
    consumerConnection.start();
    // Create a session.
    final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

    final Destination consumerDestination = consumerSession.createQueue(queueName);


    // Create a message consumer from the session to the queue.
    final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
    // Begin to wait for messages.
    Queue queue = consumerSession.createQueue(queueName);
    QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
    Enumeration msgs = queueBrowser.getEnumeration();
    while (msgs.hasMoreElements()) {
        //do your things here

        ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();

        if (message == null)
            continue;

        //handle message
        System.out.println("Message received in : " + message);
        try {
            String text = message.getText();
            JSONObject messageJson = new JSONObject(text);
            consumer.receive(1000);

            String responseString = handleMessage(messageJson);

            message.acknowledge();

谢谢莫舍

标签: javaactivemq

解决方案


这里的问题是您正在确认来自QueueBrowser实例的消息,这不会产生任何影响,因为浏览器仅用于浏览消息而不是使用它们。

...
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
    ...
    ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
    ...
    try {
        ...
        message.acknowledge(); // this does nothing

实际上是在创建一个真正的消费和调用consumer.receive(1000),但您正在丢弃返回的Message实例receive()。这是Message您必须确认才能实际使用队列中的消息的实例。

...
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
...
while (msgs.hasMoreElements()) {
    ...
    try {
        ...
        Message actualMessage = consumer.receive(1000); // acknowledge this!
        ...

另一个重要的注意事项...队列浏览器收到的消息不能保证是队列的静态快照。因此,假设您的调用consumer.receive(1000)实际上是来自队列浏览器的相同消息是危险的。在我看来,您应该重新设计此逻辑以仅使用 aMessageConsumer并删除QueueBrowser.


推荐阅读