首页 > 解决方案 > spring 如何在 DefaultMessageListenerContainer 中设置持久订阅者?

问题描述

消息的生产者不会持久发送消息,当我尝试通过 MessageListener 使用消息时,如果发生任何异常(运行时),它会重试特定次数(默认为 AMQ 端的 6 次)并且消息丢失。

原因是生产者没有将 Delivery 模式设置为 Persistent,经过一定次数的重试后,DLQ 没有被创建,消息也没有移动到 DLQ。因此,我丢失了消息。

我的代码是这样的:-

@Configuration
@PropertySource("classpath:application.properties")
public class ActiveMqJmsConfig {

@Autowired
private AbcMessageListener abcMessageListener;

public DefaultMessageListenerContainer purchaseMsgListenerforAMQ(
    @Qualifier("AMQConnectionFactory") ConnectionFactory amqConFactory) {

    LOG.info("Message listener for purchases from AMQ : Starting");
    DefaultMessageListenerContainer defaultMessageListenerContainer =
        new DefaultMessageListenerContainer();

    defaultMessageListenerContainer.setConnectionFactory(amqConFactory);

    defaultMessageListenerContainer.setMaxConcurrentConsumers(4);

    defaultMessageListenerContainer
        .setDestinationName(purchaseReceivingQueueName);

    defaultMessageListenerContainer
        .setMessageListener(abcMessageListener);

    defaultMessageListenerContainer.setSessionTransacted(true);

    return defaultMessageListenerContainer;
}

 @Bean
@Qualifier(value = "AMQConnectionFactory")
public ConnectionFactory activeMQConnectionFactory() {

    ActiveMQConnectionFactory amqConnectionFactory =
        new ActiveMQConnectionFactory();

    amqConnectionFactory
        .setBrokerURL(System.getProperty(tcp://localhost:61616));
    amqConnectionFactory
        .setUserName(System.getProperty(admin));
    amqConnectionFactory
        .setPassword(System.getProperty(admin));

    return amqConnectionFactory;

}

}

@Component
public class AbcMessageListener implements MessageListener {
 @Override
public void onMessage(Message msg) {
//CODE implementation
}

}

问题:- 通过在连接级别设置客户端 ID (Connection.setclientid("String")),即使消息不是持久的,我们也可以订阅为持久订阅者。通过这样做,如果应用程序抛出运行时异常,在一定次数的重试尝试后,将为队列创建 DLQ,并将消息移动到 DLQ。

但是在 DefaultMessageListenerContainer 中,连接不会暴露给客户端。我猜它是由 Class 本身作为一个池维护的。

如何在 DefaultMessageListenerContainer 中实现持久订阅?

标签: jmsspring-jmsamq

解决方案


您可以改为在容器上设置客户端 ID:

/**
 * Specify the JMS client ID for a shared Connection created and used
 * by this container.
 * <p>Note that client IDs need to be unique among all active Connections
 * of the underlying JMS provider. Furthermore, a client ID can only be
 * assigned if the original ConnectionFactory hasn't already assigned one.
 * @see javax.jms.Connection#setClientID
 * @see #setConnectionFactory
 */
public void setClientId(@Nullable String clientId) {
    this.clientId = clientId;
}

/**
 * Set the name of a durable subscription to create. This method switches
 * to pub-sub domain mode and activates subscription durability as well.
 * <p>The durable subscription name needs to be unique within this client's
 * JMS client id. Default is the class name of the specified message listener.
 * <p>Note: Only 1 concurrent consumer (which is the default of this
 * message listener container) is allowed for each durable subscription,
 * except for a shared durable subscription (which requires JMS 2.0).
 * @see #setPubSubDomain
 * @see #setSubscriptionDurable
 * @see #setSubscriptionShared
 * @see #setClientId
 * @see #setMessageListener
 */
public void setDurableSubscriptionName(@Nullable String durableSubscriptionName) {
    this.subscriptionName = durableSubscriptionName;
    this.subscriptionDurable = (durableSubscriptionName != null);
}

推荐阅读