首页 > 解决方案 > Spring JmsListener 不接收消息

问题描述

我有一个 Spring Boot 2.1.4 应用程序,它使用DefaultJmsListenerContainerFactory通过 SSL 使用来自 IBM MQ 9.0 的消息。我注意到在某些未知情况下,侦听器与 MQ 的连接仍然存在,并且在日志记录和 MQ 资源管理器中报告为已连接,但是侦听器不使用消息并且它们保留在队列中,直到 Spring Boot 应用程序重新启动。日志不会在应用程序或队列管理器中报告任何内容。

理想情况下,我希望JmsListener识别失败状态并自行重新连接。我已经阅读了在 MQ 上配置 Heartbeat 或 Keep Alive 并且客户端会有所帮助。MQ 通道设置为 5 秒心跳间隔,但不确定如何在 Spring Boot 应用程序中配置此设置或保持活动。

注意:JmsListener如果 MQ 连接发生异常,my 会自行重新连接。

Jms工厂代码:

@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
    defaultJmsListenerContainerFactory.setTransactionManager(jmsTransactionManager(connectionFactory));
    defaultJmsListenerContainerFactory.setSessionTransacted(true);
    defaultJmsListenerContainerFactory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
    defaultJmsListenerContainerFactory.setErrorHandler(new DefaultJmsErrorHandler());
    defaultJmsListenerContainerFactory.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);
    configurer.configure(defaultJmsListenerContainerFactory, connectionFactory);
    return defaultJmsListenerContainerFactory;
}

听众:

@JmsListener(destination = "${sys.to.app.core.queue}", containerFactory = "myFactory")
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void onMessageReceived(final Message message) throws InboundQueueMessageException {...}
    

标签: javaibm-mqspring-jms

解决方案


理想情况下,它应该是负责连接失败并重新连接的底层管道,但如果您必须这样做,那么这可能会有所帮助。

假设您有一个侦听器设置,例如:

    @JmsListener(destination = "${queue.name1}",
            containerFactory = "myListenerFactory",
            id = "Q1Object")
    public void receiveOO(MyMessageData data) {
       ...
    }

然后您可以为侦听器工厂注册一个错误处理程序。您可以在其中检查侦听器是否正在运行,如果没有,请重新启动它。



import lombok.SneakyThrows;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.*;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.util.ErrorHandler;

import javax.jms.ConnectionFactory;


@EnableJms
@Configuration
public class MQConfigurationAndHandler implements ErrorHandler {

    protected final Log logger = LogFactory.getLog(getClass());

    private final ConnectionFactory connectionFactory;

    // The listener registry allows us to control the @JmsListener endpoints
    private final JmsListenerEndpointRegistry registry;

    public MQConfigurationAndHandler(ConnectionFactory connectionFactory,
                                     JmsListenerEndpointRegistry registry) {
        this.connectionFactory = connectionFactory;
        this.registry = registry;
    }


    @SneakyThrows
    @Override
    public void handleError(Throwable e) {
        logger.warn("Listener error has been thrown");
        logger.warn(e.getMessage());

//        logger.info("Registered listeners IDs are : ");
//        for (String listenerId : registry.getListenerContainerIds()) {
//            logger.info("ID " + listenerId);
//        }

        logger.info("Checking listener");
        MessageListenerContainer mlc = registry.getListenerContainer("Q1Object");
        if (! mlc.isRunning()) {
            logger.warn("Listener has stopped running attempting restart");
            mlc.start();
        } else {
            logger.warn("Listener is running, something else is throwing an error");
            throw e;
        }
    }

    @Bean("myListenerFactory")
    public DefaultJmsListenerContainerFactory myCustomisedListenerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(this);

        return factory;
    }

}


推荐阅读