首页 > 技术文章 > ActiveMQ--Spring整合之队列生产者

zzhAylm 2021-05-28 20:23 原文

  1. Maven修改,需要添加Spring支持的JMS包
    1. pom.xml
    2. <?xml version="1.0" encoding="UTF-8"?>

      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>

      <groupId>com.model</groupId>
      <artifactId>activemq_demo02</artifactId>
      <version>1.0-SNAPSHOT</version>
      <packaging>war</packaging>

      <name>activemq_demo02 Maven Webapp</name>
      <!-- FIXME change it to the project's website -->
      <url>http://www.example.com</url>

      <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <maven.compiler.source>1.8</maven.compiler.source>
      <maven.compiler.target>1.8</maven.compiler.target>
      </properties>

      <dependencies>
      <!-- activemq所需要的jar包-->
      <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.9</version>
      </dependency>
      <dependency>
      <groupId>org.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>2.1</version>
      </dependency>
      <!--caused by: java.long.classNotFoundException:com.fasterxml.jackson.databind.ObjectMapper-->
      <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.9.5</version>
      </dependency>
      <!--activemq对JMS的支持,整合spring和activemq-->
      <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>4.3.23.RELEASE</version>
      </dependency>
      <!--activemq所需要的pool包配置-->
      <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.15.9</version>
      </dependency>
      <!--spring-AOP等相关jar-->
      <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-core</artifactId>
      <version>4.3.23.RELEASE</version>
      </dependency>
      <!--spring核心ioc-->
      <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>4.3.23.RELEASE</version>
      </dependency>
      <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-orm</artifactId>
      <version>4.3.23.RELEASE</version>
      </dependency>
      <dependency>
      <groupId>org.aspectj</groupId>
      <artifactId>aspectjrt</artifactId>
      <version>1.6.1</version>
      </dependency>
      <dependency>
      <groupId>org.aspectj</groupId>
      <artifactId>aspectjweaver</artifactId>
      <version>1.9.5</version>
      </dependency>
      <dependency>
      <groupId>cglib</groupId>
      <artifactId>cglib</artifactId>
      <version>2.1_2</version>
      </dependency>

      <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.12</version>
      <scope>provided</scope>
      </dependency>
      <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13.2</version>
      <scope>test</scope>
      </dependency>

      </dependencies>
      </project>
          
  2. spring配置文件applicationContext.xml
    1.  
      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context"
      xsi:schemaLocation="http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
      http://www.springframework.org/schema/context
      http://www.springframework.org/schema/context/spring-context.xsd">

      <context:component-scan base-package="com.model.activemq"/>
      <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
      <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
      <property name="brokerURL" value="tcp://192.168.56.130:61616"/>
      </bean>
      </property>
      <property name="maxConnections" value="100"/>
      </bean>
      <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg index="0" value="spring-active-queue"/>
      </bean>
      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="connectionFactory" ref="jmsFactory"/>
      <property name="defaultDestination" ref="destinationQueue"/>
      <property name="messageConverter">
      <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
      </property>
      </bean>

      </beans>
       
    2. 生产者:SpringMQ_Produce.java
      1. package com.model.activemq;
        
        
        import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.context.ApplicationContext;
        import org.springframework.jms.core.JmsTemplate;
        import org.springframework.stereotype.Service;
        
        
        import javax.jms.TextMessage;
        
        
        @Service
        public class SpringMQ_Produce {
        
            @Autowired
            private JmsTemplate jmsTemplate;
        
            public static void main(String[] args) {
        
                ApplicationContext act=new ClassPathXmlApplicationContext("applicationContext.xml");
                SpringMQ_Produce produce = (SpringMQ_Produce) act.getBean("springMQ_Produce");
                produce.jmsTemplate.send((session -> {
                    TextMessage textMessage=session.createTextMessage("*******activemq*生产者生产的消息:********");
                    return textMessage;
                }));
            }
        }
    3.  

      消费者SpringMQ_Consumer
      1. package com.model.activemq;
        
        import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.context.ApplicationContext;
        import org.springframework.jms.core.JmsTemplate;
        import org.springframework.stereotype.Service;
        
        @Service
        public class SpringMQ_Consumer {
            @Autowired
            private JmsTemplate jmsTemplate;
        
            public static void main(String[] args) {
                ApplicationContext act=new ClassPathXmlApplicationContext("applicationContext.xml");
                SpringMQ_Consumer consumer = (SpringMQ_Consumer) act.getBean("springMQ_Consumer");
                String resValue = (String) consumer.jmsTemplate.receiveAndConvert();
                System.out.println("****消费者消费消息:**"+resValue);
            }
        } 
              
  3. 队列  
    1. 配置文件修改applicationContext.xml:  
    2. <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context"
      xsi:schemaLocation="http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
      http://www.springframework.org/schema/context
      http://www.springframework.org/schema/context/spring-context.xsd">


      <!-- 注解扫描包,带有@service注解的类加入到spring容器中-->
      <context:component-scan base-package="com.model.activemq"/>

      <!--配置生产者-->
      <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
      <!--真正可以Connection的ConnectionFactory,由对应的JMS服务厂提供-->
      <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
      <property name="brokerURL" value="tcp://192.168.56.130:61616"/>
      </bean>
      </property>
      <property name="maxConnections" value="100"/>
      </bean>

      <!-- 这是队列,点对点的-->
      <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg index="0" value="spring-active-queue"/>
      </bean>

      <!--这是主题,点对点的-->
      <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
      <constructor-arg index="0" value="spring-active-topic"/>
      </bean>
      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="connectionFactory" ref="jmsFactory"/>
      <!--绑定时队列还时topic,只需要修改这里就可以了-->
      <property name="defaultDestination" ref="destinationQueue"/>
      <property name="messageConverter">
      <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
      </property>
      </bean>

      </beans>

       

        
  4. 主题
    1. 配文件修改applicationContext.xml:
    2. <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context"
      xsi:schemaLocation="http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
      http://www.springframework.org/schema/context
      http://www.springframework.org/schema/context/spring-context.xsd">


      <!-- 注解扫描包,带有@service注解的类加入到spring容器中-->
      <context:component-scan base-package="com.model.activemq"/>

      <!--配置生产者-->
      <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
      <!--真正可以Connection的ConnectionFactory,由对应的JMS服务厂提供-->
      <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
      <property name="brokerURL" value="tcp://192.168.56.130:61616"/>
      </bean>
      </property>
      <property name="maxConnections" value="100"/>
      </bean>

      <!-- 这是队列,点对点的-->
      <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg index="0" value="spring-active-queue"/>
      </bean>

      <!--这是主题,点对点的-->
      <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
      <constructor-arg index="0" value="spring-active-topic"/>
      </bean>
      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="connectionFactory" ref="jmsFactory"/>
      <!--绑定时队列还时topic,只需要修改这里就可以了-->
      <property name="defaultDestination" ref="destinationTopic"/>
      <property name="messageConverter">
      <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
      </property>
      </bean>

      </beans>

       

  5. 在spring里面实现消费者不启动,直接通过配置监听完成
    1. 监听器使用,在生产topic时正常情况我们必须先启动消费者,在启动生产者,
    2. 当我们使用监听程序后,我们可以直接启动生产者,生产消息,消费者会自动消费。这就是监听程序
    3. 实现
      1. applicationContext.xml增加一个监听器bean
        1. <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="http://www.springframework.org/schema/beans"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xmlns:context="http://www.springframework.org/schema/context"
                 xsi:schemaLocation="http://www.springframework.org/schema/beans
                 http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
                 http://www.springframework.org/schema/context
                 http://www.springframework.org/schema/context/spring-context.xsd">
          
          
          <!--    注解扫描包,带有@service注解的类加入到spring容器中-->
            <context:component-scan base-package="com.model.activemq"/>
          
              <!--配置生产者-->
              <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
                 <!--真正可以Connection的ConnectionFactory,由对应的JMS服务厂提供-->
                  <property name="connectionFactory">
                      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                          <property name="brokerURL" value="tcp://192.168.56.130:61616"/>
                      </bean>
                  </property>
                  <property name="maxConnections" value="100"/>
              </bean>
          
              <!-- 这是队列,点对点的-->
              <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
                  <constructor-arg index="0" value="spring-active-queue"/>
              </bean>
          
              <!--这是主题,点对点的-->
              <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
                  <constructor-arg index="0" value="spring-active-topic"/>
              </bean>
              <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
                  <property name="connectionFactory" ref="jmsFactory"/>
                  <!--绑定时队列还时topic-->
                  <property name="defaultDestination" ref="destinationTopic"/>
                  <property name="messageConverter">
                      <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
                  </property>
              </bean>
          
              <!--配置监听程序  -->
              <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                  <property name="connectionFactory" ref="jmsFactory"/>
                  <property name="destination" ref="destinationTopic"/>
                  <!-- public class MyMessageListener implement MessageLister-->
                  <property name="messageListener" ref="myMessageListener"/>
              </bean>
          
          </beans>
      2. 实现监听程序中的myMessageListenener类,并将其加入到spring的容器中

        1. package com.model.activemq;
          
          import org.springframework.stereotype.Component;
          
          import javax.jms.JMSException;
          import javax.jms.Message;
          import javax.jms.MessageListener;
          import javax.jms.TextMessage;
          
          @Component
          public class MyMessageListener implements MessageListener {
              @Override
              public void onMessage(Message message) {
                  if (null!=message&&message instanceof TextMessage){
                      TextMessage textMessage= (TextMessage) message;
                      try {
                          System.out.println(textMessage.getText());
                      } catch (JMSException e) {
                          e.printStackTrace();
                      }
                  }
              }
          }
      3. 成功实现,我们直接启动生产者生产topic就可以由消费者自东进行消费。   

           

推荐阅读