首页 > 解决方案 > spring jmsListener 监听多个队列

问题描述

在这篇文章中,Garry Russell 解释了如何以编程方式创建多个 KafkaListener 来监听多个主题。[这个设置实际上对我来说很成功] Kafka Spring:如何动态或循环创建监听器?

现在我想为 JMSListeners 设置一个类似的设置——我可以有一个类,其中有一个 @JMSListener,我可以以编程方式创建该 JMSListener 的多个实例,每个实例都注入了自己的 queueName。

我发现这篇文章 Spring JMS start Listening to jms queues on request

在这篇文章的最后,加里发表了类似的评论,

如果您希望动态创建大量容器,则只需以编程方式创建容器,调用 afterPropertiesSet(),然后调用 start()

我使用了上面第一篇文章中的设置(与 KafkaListeners 相关),我的多个 JMS 侦听器实例正在启动但不消耗任何消息。

基本上我不明白我在哪里做这个

然后只需以编程方式创建容器,调用 afterPropertiesSet(),然后调用 start()

我对这个词感到困惑 - 容器,我知道有 JMSListener 和 JmsListenerContainerFactory,在这种情况下什么是容器 - 我猜是 JMSListener?

我已确认队列中有消息。另外,当我不以编程方式创建侦听器并且只使用一个带有硬编码队列的侦听器时,它会很好地消耗消息。

当我以编程方式创建多个 JMS 侦听器时,基本上没有侦听器正在使用消息

    @SpringBootApplication
@EnableJms
public class MqProdConsumerApplication {
    private static Logger logger = LogManager.getLogger(MqProdConsumerApplication.class.getName());
    private static Consumers consumersStatic;

    @Autowired
    Consumers consumers;

    @PostConstruct
    public void init() {
        consumersStatic = this.consumers;
    }

    @Bean
    public Gson gson() {
        return new Gson();
    }

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(MqProdConsumerApplication.class, args);
        List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
        Assert.notEmpty(queueInformationList, "queueInformationList cannot be empty");
        logger.debug("queueInformationList ************" + queueInformationList.toString());
        for (QueueInformation queueInformation : queueInformationList) {
            AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
            child.setParent(context);
            child.register(MQConfig.class);
            Properties props = new Properties();
            props.setProperty("mqQueueName", queueInformation.getMqQueueName());
            //
            PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
            child.getEnvironment().getPropertySources().addLast(pps);
            child.refresh();
        }
    }
}

这是具有 listenerContainerFactory 的 MQConfig

@Configuration
public class MQConfig {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${ibm.mq.user}")
    private String mqUserName;

    @Bean
    public MQListener listener() {
        return new MQListener();
    }

    @PostConstruct
    public void afterConstruct() {
        logger.debug("************* initialized MQ Config successfully for user =" + mqUserName);
    }

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);

        // Put the MQ username in the PCF environment.
        // Otherwise, the connection is identified by PCF's default user, "VCAP"
        System.setProperty("user.name", mqUserName);
        return factory;
    }
}

然后是具有实际 @JMSListener 的 MQListener

    public class MQListener {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${mqQueueName}")
    private String mqQueueName;

    @PostConstruct
    public void afteConstruct() {
        logger.debug("************* initialized MQ Listener successfully, will read from =" + mqQueueName);

    }

    @JmsListener(destination = "${mqQueueName}", containerFactory = "myFactory")
    public void receiveMessage(String receivedMessage) throws JAXBException, ExecutionException, InterruptedException {
        logger.debug("***********************************************receivedMessage:" + receivedMessage);
    }
}

这是我的 application.yml

    ibm.mq.queueManager: ABCTOD01
ibm.mq.channel: QMD00.SERVER
ibm.mq.connName: mqdv1.devfg.ABC.com
ibm.mq.user: pmd0app1
ibm.mq.password:
consumers:
  queueInformationList:
  -
    mqQueueName: QMD00.D.SRF.PERSON.LITE.PHONE.LOAD
  -
    mqQueueName: QMD00.D.SRF.PERSON.PHONE.LOAD

标签: javaspring-bootibm-mqspring-jms

解决方案


好的,我发现另一篇文章 Gary 回答了我正在寻找的 添加动态听众数量(Spring JMS)

基本上这是我的工作解决方案。干得好@GaryRussell - 我现在是粉丝 :)

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
        int i = 0;
        for (QueueInformation queueInformation :
                queueInformationList) {
            SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
            endpoint.setId("myJmsEndpoint-" + i++);
            endpoint.setDestination(queueInformation.getMqQueueName());
            endpoint.setMessageListener(message -> {
                logger.debug("***********************************************receivedMessage:" + message);
            });
            registrar.registerEndpoint(endpoint);
            logger.debug("registered the endpoint for queue" + queueInformation.getMqQueueName());
  }
}

另请参阅https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#jms-annotated-programmatic-registration


推荐阅读