首页 > 解决方案 > 无法使用 ActiveMQ 将主题消息同时发布给两个订阅者

问题描述

我已经参考了SpringBoot 应用程序发布并从 ActiveMQ 主题中读取,使用 ActiveMQ 发布主题。我创建了两个从主题读取消息的接收者微服务。我还创建了休息端点来发布主题。但是,我必须执行此休息端点两次才能为两个接收者发布消息
1)。休息端点的第一次执行将向 Receiver1 发送消息
2)。rest 端点的第二次执行将向 Receiver2 发送消息

因此 2 个接收者无法同时读取主题。
这是我的代码。

PublisherApplication.java

package com.springboot;

//import statements

@SpringBootApplication
@EnableDiscoveryClient
@EnableJms
public class PublisherApplication {

    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();      
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        //setPubSubDomain identifies Topic in ActiveMQ
        factory.setPubSubDomain(true);
        return factory;
    }


    public static void main(String[] args) {  
        SpringApplication.run(PublisherApplication.class, args);
    }

    @Bean
    public RestTemplate restTemplate(RestTemplateBuilder builder) {
        return builder.build();

    }

}

PublishMessage.java
[发布主题的休息端点]

package com.springboot.controller;

//import statements

@RestController
@RequestMapping(path = "/schoolDashboard/topic")
class PublishMessage {

    public static final String MAILBOX_TOPIC = "mailbox.topic";

    @Autowired
    private JmsTemplate jmsTemplate;

    @GetMapping(path = "/sendEmail")
    public void sendStudentById() throws Exception{
        System.out.println("Anindya-TopicSendMessage.java :: Publishing Email sent....");
        jmsTemplate.convertAndSend(MAILBOX_TOPIC, "Topic - Email Sent");
    }

}

ReceiverApplication01
[注意 - Receiver01 是第一个微服务]

package com.springboot;

//import statements

@SpringBootApplication
@EnableJms
public class ReceiverApplication01 {


    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();      
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        //setPubSubDomain identifies Topic in ActiveMQ
        factory.setPubSubDomain(true);
        return factory;
    }

    public static void main(String[] args) {
        SpringApplication.run(ReceiverApplication01.class, args);
    }

}

TopicMesssgeReceiver01.java
[Receiver01 从主题中读取消息]

package com.springboot.message;

//import statement

@Component
public class TopicMesssgeReceiver01 {

    private final SimpleMessageConverter converter = new SimpleMessageConverter();

    public static final String MAILBOX_TOPIC = "mailbox.topic";

    @JmsListener(destination = MAILBOX_TOPIC, containerFactory = "topicListenerFactory")
    public void receiveMessage(final Message message) throws JMSException{
        System.out.println("Receiver01 <" + String.valueOf(this.converter.fromMessage(message)) + ">");
    }

}

ReceiverApplication02
[注:-Receiver02 是第二个微服务]

package com.springboot;

//import statement

@SpringBootApplication
@EnableJms
public class ReaderApplication02 {

    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        configurer.configure(factory, connectionFactory);       
        return factory;
    }

    public static void main(String[] args) {
        SpringApplication.run(ReaderApplication02.class, args);
    }

}

TopicMesssgeReceiver02
[Receiver02 从主题中读取消息]


package com.springboot.message;

//import statement

@Component
public class TopicMesssgeReceiver02 {


private final SimpleMessageConverter converter = new SimpleMessageConverter();

    public static final String MAILBOX_TOPIC = "mailbox.topic";

    @JmsListener(destination = MAILBOX_TOPIC, containerFactory = "topicListenerFactory")
    public void receiveMessage(final Message message) throws Exception{
        System.out.println("Receiver02 <" + String.valueOf(this.converter.fromMessage(message)) + ">");
    }

}

标签: spring-bootactivemq

解决方案


谢谢纳文!!最后,我能够做到。
我们只需要设置setPubSubDomain(true); spring-boot 将处理所有样板代码。
现在,两个接收者微服务可以同时从 Topic 读取消息
以下是代码更改

PublishMessage.java
[发布主题的休息端点]

package com.springboot.controller;

//import statements

@RestController
@RequestMapping(path = "/schoolDashboard/topic")
class PublishMessage {

    public static final String MAILBOX_TOPIC = "mailbox.topic";

    @Autowired
    private JmsTemplate jmsTemplate;

    @GetMapping(path = "/sendEmail")
    public void sendStudentById() throws Exception{
        System.out.println("Publisher :: Message sent...");
        /* Added this statement. setPubSubDomain(true) identifies Topic in ActiveMQ */
        jmsTemplate.setPubSubDomain(true);
        jmsTemplate.convertAndSend(MAILBOX_TOPIC, "Topic - Email Sent");
    }

}

ReceiverApplication02
[注:-Receiver02 是第二个微服务]

package com.springboot;

//import statement

@SpringBootApplication
@EnableJms
public class ReaderApplication02 {

    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();       
        configurer.configure(factory, connectionFactory);    
        /* setPubSubDomain(true) should be placed after 
         * configuration of the specified jms listener container factory*/
        factory.setPubSubDomain(true);
        return factory;
    }

    public static void main(String[] args) {
        SpringApplication.run(ReaderApplication02.class, args);
    }

}


推荐阅读