首页 > 技术文章 > RabbitMq 初学五大模式 通俗易懂 超详细 【包含案例】

beixuan 2020-07-24 15:26 原文

RabbitMQ五种工作模式

HelloWorld 简单模式

WorkQueues 工作队列模式

Publish/Subscribe 发布/订阅模式

Routing 路由模式

Topic 通配符模式

五大模式总结

一、HelloWorld 简单模式

1.创建Maven工程(我用的IDEA)

File[文件] -> New[新建] -> Project[工程] -> Maven[选择Maven] -> Next[直接下一步] -> Name[输入项目名称] —> Finish[完成]

2.在项目里创建两个子工程

Producer 消息生产者

项目名称位置右键 -> New[新建] -> Module[组件] -> Maven[选择Maven] -> Next[下一步] -> Name[输入Producer] —> Finish[完成]

Consumer 消息消费者

项目名称位置右键 -> New[新建] -> Module[组件] -> Maven[选择Maven] -> Next[下一步] -> Name[输入Consumer] —> Finish[完成]

3.在主项目工程的pom文件里填写依赖(注意是主项目 两个子项目会继承父项目的依赖)

<dependencies>
    <!--com.rabbitmq:amqp-client RabbitMq依赖 [重要]-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.7.3</version>
    </dependency>
    
    <!--下面三个依赖是为了方便控制台输出Log [一般]-->
    <!--junit:junit 单元测试框架 用了都说好-->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13</version>
    </dependency>
    
    <!--org.projectlombok:lombok 整合注解-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
    </dependency>
    
    <!--ch.qos.logback:logback-classic 日志框架-->
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.2.3</version>
    </dependency>
</dependencies>

4.编写生产者

4.1.在Producer项目里创建Java类文件

src/main/java/包名*/ProducerApplication.java

4.2.撸码(具体看注释)

 import com.rabbitmq.client.*;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.Test;
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
 /**
  * RabbitMQ - 简单模式 生产者
  * @author Admin
  */
 @Slf4j
 public class ProducerApplication {
     
     //定义Queue队列的名称
     private final static String SIMPLE_QUEUE = "simple";
     
     public static ConnectionFactory getFactory(){
         //设置连接工程
         ConnectionFactory factory = new ConnectionFactory();
         //设置 host
         factory.setHost("localhost");
         //设置 port 默认为5672
         factory.setPort(5672);
         //设置 virtualHost 
         factory.setVirtualHost("/");
         //设置 账号和密码
         factory.setUsername("username");
         factory.setPassword("password");
         return factory;
     }
 
     /**
      * 生产者
      */
     @Test
     public void testSend(){
         try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();
             /*
              * channel.queueDeclare();
              * Param1: 队列名称
              * Param2: 是否持久化
              * Param3: 是否具有独占性
              * Param4: 是否自动删除队列
              * Param5: 具体参数
              */
             channel.queueDeclare(SIMPLE_QUEUE,true,false,false,null);
             //发布信息到队列
             String message = "简单模式 —> 发送第1条信息";
             channel.basicPublish("",SIMPLE_QUEUE, null, message.getBytes());
             log.debug("发送信息成功");
 
             //关闭资源
             channel.close();
             connection.close();
         } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
         } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
         }
     }
 }

5.编写消费者

5.1.在Consumer项目里创建Java类文件

src/main/java/包名*/ConsumerApplication.java

5.2.撸码(重点看注解)

   import com.rabbitmq.client.*;
   import lombok.extern.slf4j.Slf4j;
   import org.junit.Test;
   import java.io.IOException;
   import java.util.concurrent.TimeoutException;
   
   /**
    * RabbitMQ - 简单模式 消费者
    * @author Admin
    */
   @Slf4j
   public class ConsumerApplication {
       
       //定义Queue队列的名称
       private final static String SIMPLE_QUEUE = "simple";
       
       public static ConnectionFactory getFactory(){
           //设置连接工程
           ConnectionFactory factory = new ConnectionFactory();
           //设置 host
           factory.setHost("localhost");
           //设置 port 默认为5672
           factory.setPort(5672);
           //设置 virtualHost 
           factory.setVirtualHost("/");
           //设置 账号和密码
           factory.setUsername("username");
           factory.setPassword("password");
           return factory;
       }
   
         /**
          * 消费者
          */
       public static void main(String[] args) {
          try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();
             //回调方法
             DeliverCallback deliverCallback = (consumerTag,delivery) -> {
                 //接受信息到队列
                 String message = new String(delivery.getBody(),"UTF-8");
                 log.info(" [√] 接受到消息:"+message);
             };
     
             channel.basicConsume(SIMPLE_QUEUE,deliverCallback,consumerTag ->{});
     
             //consumer不关闭channel 和 connection 因为要一直监视消息

          } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
          } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
          }
       }
   }

二、WorkQueues 工作队列模式

知识点:Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。

1 - 3步与简单模式一致

过去看看

4.编写生产者

4.1.在Producer项目里创建Java类文件

src/main/java/包名*/ProducerApplication.java

4.2.撸码(具体看注释)

 import com.rabbitmq.client.*;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.Test;
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
 /**
  * RabbitMQ - 简单模式 生产者
  * @author Admin
  */
 @Slf4j
 public class ProducerApplication {
     
     //定义Queue队列的名称
     private final static String WORK_QUEUE = "work_queue";
     
     public static ConnectionFactory getFactory(){
         //设置连接工程
         ConnectionFactory factory = new ConnectionFactory();
         //设置 host
         factory.setHost("localhost");
         //设置 port 默认为5672
         factory.setPort(5672);
         //设置 virtualHost 
         factory.setVirtualHost("/");
         //设置 账号和密码
         factory.setUsername("username");
         factory.setPassword("password");
         return factory;
     }
 
     /**
      * 生产者
      */
     @Test
     public void testSend(){
         try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();
             /*
              * channel.queueDeclare();
              * Param1: 队列名称
              * Param2: 是否持久化
              * Param3: 是否具有独占性
              * Param4: 是否自动删除队列
              * Param5: 具体参数
              */
             channel.queueDeclare(WORK_QUEUE,true,false,false,null);
             //发布信息到队列
             for (int i = 1; i <= 20; i++){ 
                String message = "工作队列模式 —> 发送第" + i + "条信息";
                channel.basicPublish("",WORK_QUEUE, null, message.getBytes());
             }
             log.debug("发送信息成功");
             //关闭资源
             channel.close();
             connection.close();
         } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
         } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
         }
     }
 }

5.编写消费者

5.1.在Consumer项目里创建Java类文件

src/main/java/包名*/ConsumerApplication01.java
src/main/java/包名*/ConsumerApplication02.java

5.2.撸码(重点看注解)

消费者01

   import com.rabbitmq.client.*;
   import lombok.extern.slf4j.Slf4j;
   import org.junit.Test;
   import java.io.IOException;
   import java.util.concurrent.TimeoutException;
   
   /**
    * RabbitMQ - 工作队列模式 消费者01
    * @author Admin
    */
   @Slf4j
   public class ConsumerApplication01 {
       
       //定义Queue队列的名称
       private final static String WORK_QUEUE = "work_queue";
       
       public static ConnectionFactory getFactory(){
           //设置连接工程
           ConnectionFactory factory = new ConnectionFactory();
           //设置 host
           factory.setHost("localhost");
           //设置 port 默认为5672
           factory.setPort(5672);
           //设置 virtualHost 
           factory.setVirtualHost("/");
           //设置 账号和密码
           factory.setUsername("username");
           factory.setPassword("password");
           return factory;
       }
   
         /**
          * 消费者
          */
       public static void main(String[] args) {
          try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();

             //一次只能接收并处理一个消息
             channel.basicQos(1);

             //回调方法
             DeliverCallback deliverCallback = (consumerTag,delivery) -> {
                 //接受信息到队列
                 String message = new String(delivery.getBody(),"UTF-8");
                 log.info(" [√] 接受到消息:"+message);
                 try {
                     //消费者01 模拟延迟1秒
                     Thread.sleep(1000);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 } finally {
                     //消息消费完后自动确认发送到MQ
                     channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                 }
             };
     
             channel.basicConsume(WORK_QUEUE,deliverCallback,consumerTag ->{});
     
             //consumer不关闭channel 和 connection 因为要一直监视消息

          } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
          } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
          }
       }
   }

消费者02

   import com.rabbitmq.client.*;
   import lombok.extern.slf4j.Slf4j;
   import org.junit.Test;
   import java.io.IOException;
   import java.util.concurrent.TimeoutException;
   
   /**
    * RabbitMQ - 工作队列模式 消费者02
    * @author Admin
    */
   @Slf4j
   public class ConsumerApplication02 {
       
       //定义Queue队列的名称
       private final static String WORK_QUEUE = "work_queue";
       
       public static ConnectionFactory getFactory(){
           //设置连接工程
           ConnectionFactory factory = new ConnectionFactory();
           //设置 host
           factory.setHost("localhost");
           //设置 port 默认为5672
           factory.setPort(5672);
           //设置 virtualHost 
           factory.setVirtualHost("/");
           //设置 账号和密码
           factory.setUsername("username");
           factory.setPassword("password");
           return factory;
       }
   
         /**
          * 消费者
          */
       public static void main(String[] args) {
          try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();

             //一次只能接收并处理一个消息
             channel.basicQos(1);

             //回调方法
             DeliverCallback deliverCallback = (consumerTag,delivery) -> {
                 //接受信息到队列
                 String message = new String(delivery.getBody(),"UTF-8");
                 log.info(" [√] 接受到消息:"+message);
                 try {
                     //消费者02 模拟延迟2秒
                     Thread.sleep(2000);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 } finally {
                     //消息消费完后自动确认发送到MQ
                     channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                 }
             };
     
             channel.basicConsume(WORK_QUEUE,deliverCallback,consumerTag ->{});
     
             //consumer不关闭channel 和 connection 因为要一直监视消息

          } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
          } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
          }
       }
   }

6.测试

先运行两个消费者,在启动生产者

7.小结

一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

三、Publish/Subscribe 发布/订阅模式

知识点:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

1 - 3步与简单模式一致

过去看看

4.编写生产者

4.1.在Producer项目里创建Java类文件

src/main/java/包名*/ProducerApplication.java

4.2.撸码(具体看注释)

 import com.rabbitmq.client.*;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.Test;
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
 /**
  * RabbitMQ - 发布订阅模式 生产者
  * @author Admin
  */
 @Slf4j
 public class ProducerApplication {
     
     //交换机名称
     static final String FANOUT_EXCHANGE = "fanout_exchange";
     //队列名称
     static final String FANOUT_QUEUE_1 = "fanout_queue_1";
     //队列名称
     static final String FANOUT_QUEUE_2 = "fanout_queue_2";
     
     public static ConnectionFactory getFactory(){
         //设置连接工程
         ConnectionFactory factory = new ConnectionFactory();
         //设置 host
         factory.setHost("localhost");
         //设置 port 默认为5672
         factory.setPort(5672);
         //设置 virtualHost 
         factory.setVirtualHost("/");
         //设置 账号和密码
         factory.setUsername("username");
         factory.setPassword("password");
         return factory;
     }
 
     /**
      * 生产者
      */
     @Test
     public void testSend(){
         try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();
             /**
              * 声明交换机
              * 参数1:交换机名称
              * 参数2:交换机类型,fanout、topic、direct、headers
              */
             channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
     
             // 声明(创建)队列
             /**
              * 参数1:队列名称
              * 参数2:是否定义持久化队列
              * 参数3:是否独占本次连接
              * 参数4:是否在不使用的时候自动删除队列
              * 参数5:队列其它参数
              */
             channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
             channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
             //发布信息到队列
             for (int i = 1; i <= 20; i++){ 
                String message = "发布订阅模式 —> 发送第" + i + "条信息";
                channel.basicPublish(FANOUT_EXCHANGE, "", null, message.getBytes());
             }
             log.debug("发送信息成功");
             //关闭资源
             channel.close();
             connection.close();
         } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
         } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
         }
     }
 }

5.编写消费者

5.1.在Consumer项目里创建Java类文件

src/main/java/包名*/ConsumerApplication01.java
src/main/java/包名*/ConsumerApplication02.java

5.2.撸码(重点看注解)

消费者01

   import com.rabbitmq.client.*;
   import lombok.extern.slf4j.Slf4j;
   import org.junit.Test;
   import java.io.IOException;
   import java.util.concurrent.TimeoutException;
   
   /**
    * RabbitMQ - 简单模式 消费者
    * @author Admin
    */
   @Slf4j
   public class ConsumerApplication01 {
       
       //交换机名称
       static final String FANOUT_EXCHANGE = "fanout_exchange";
       //队列名称
       static final String FANOUT_QUEUE_1 = "fanout_queue_1";
       
       public static ConnectionFactory getFactory(){
           //设置连接工程
           ConnectionFactory factory = new ConnectionFactory();
           //设置 host
           factory.setHost("localhost");
           //设置 port 默认为5672
           factory.setPort(5672);
           //设置 virtualHost 
           factory.setVirtualHost("/");
           //设置 账号和密码
           factory.setUsername("username");
           factory.setPassword("password");
           return factory;
       }
   
         /**
          * 消费者
          */
       public static void main(String[] args) {
          try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();
             // 声明(创建)队列
             /**
              * 参数1:队列名称
              * 参数2:是否定义持久化队列
              * 参数3:是否独占本次连接
              * 参数4:是否在不使用的时候自动删除队列
              * 参数5:队列其它参数
              */
              channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);
             
             //队列绑定交换机
             channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");

             //回调方法
             DeliverCallback deliverCallback = (consumerTag,delivery) -> {
                 //接受信息到队列
                 String message = new String(delivery.getBody(),"UTF-8");
                 log.info(" [√] 接受到消息:"+message);
             };
     
             channel.basicConsume(FANOUT_QUEUE_1,true,deliverCallback,consumerTag ->{});
     
             //consumer不关闭channel 和 connection 因为要一直监视消息

          } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
          } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
          }
       }
   }

消费者02

   import com.rabbitmq.client.*;
   import lombok.extern.slf4j.Slf4j;
   import org.junit.Test;
   import java.io.IOException;
   import java.util.concurrent.TimeoutException;
   
   /**
    * RabbitMQ - 简单模式 消费者
    * @author Admin
    */
   @Slf4j
   public class ConsumerApplication02 {
       
       //交换机名称
       static final String FANOUT_EXCHANGE = "fanout_exchange";
       //队列名称
       static final String FANOUT_QUEUE_2 = "fanout_queue_2";
     
       public static ConnectionFactory getFactory(){
           //设置连接工程
           ConnectionFactory factory = new ConnectionFactory();
           //设置 host
           factory.setHost("localhost");
           //设置 port 默认为5672
           factory.setPort(5672);
           //设置 virtualHost 
           factory.setVirtualHost("/");
           //设置 账号和密码
           factory.setUsername("username");
           factory.setPassword("password");
           return factory;
       }
 
         /**
          * 消费者
          */
       public static void main(String[] args) {
          try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();
             // 声明(创建)队列
             /**
              * 参数1:队列名称
              * 参数2:是否定义持久化队列
              * 参数3:是否独占本次连接
              * 参数4:是否在不使用的时候自动删除队列
              * 参数5:队列其它参数
              */
              channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
             
             //队列绑定交换机
             channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");
 
             //回调方法
             DeliverCallback deliverCallback = (consumerTag,delivery) -> {
                 //接受信息到队列
                 String message = new String(delivery.getBody(),"UTF-8");
                 log.info(" [√] 接受到消息:"+message);
             };
   
             channel.basicConsume(FANOUT_QUEUE_2,true,deliverCallback,consumerTag ->{});
   
             //consumer不关闭channel 和 connection 因为要一直监视消息
 
          } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
          } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
          }
       }
   }

6.测试

启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。

7.小结

发布订阅模式与工作队列模式的区别
1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机。

四、Routing 路由模式

知识点:
1. 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
2. 息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
3. Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

1 - 3步与简单模式一致

过去看看

4.编写生产者

4.1.在Producer项目里创建Java类文件

src/main/java/包名*/ProducerApplication.java

4.2.撸码(具体看注释)

 import com.rabbitmq.client.*;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.Test;
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
 /**
  * RabbitMQ - 路由模式 生产者
  * @author Admin
  */
 @Slf4j
 public class ProducerApplication {
     
     //交换机名称
     static final String DIRECT_EXCHANGE = "direct_exchange";
     //队列名称
     static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
     //队列名称
     static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
     
     public static ConnectionFactory getFactory(){
         //设置连接工程
         ConnectionFactory factory = new ConnectionFactory();
         //设置 host
         factory.setHost("localhost");
         //设置 port 默认为5672
         factory.setPort(5672);
         //设置 virtualHost 
         factory.setVirtualHost("/");
         //设置 账号和密码
         factory.setUsername("username");
         factory.setPassword("password");
         return factory;
     }
 
     /**
      * 生产者
      */
     @Test
     public void testSend(){
         try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();
             /**
              * 声明交换机
              * 参数1:交换机名称
              * 参数2:交换机类型,fanout、topic、direct、headers
              */
             channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
     
             // 声明(创建)队列
             /**
              * 参数1:队列名称
              * 参数2:是否定义持久化队列
              * 参数3:是否独占本次连接
              * 参数4:是否在不使用的时候自动删除队列
              * 参数5:队列其它参数
              */
             channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
             channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
             //队列绑定交换机

             channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHANGE, "insert");
             channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHANGE, "update");

             //发布信息到队列
             String insert = "路由模式发布消息 Routing为:insert";
             channel.basicPublish(DIRECT_EXCHANGE, "insert", null, insert.getBytes());
             String update = "路由模式发布消息 Routing为:update";
             channel.basicPublish(DIRECT_EXCHANGE, "update", null, update.getBytes());

             log.debug("发送信息成功");
             //关闭资源
             channel.close();
             connection.close();
         } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
         } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
         }
     }
 }

5.编写消费者

5.1.在Consumer项目里创建Java类文件

src/main/java/包名*/ConsumerApplication01.java
src/main/java/包名*/ConsumerApplication02.java

5.2.撸码(重点看注解)

消费者01

   import com.rabbitmq.client.*;
   import lombok.extern.slf4j.Slf4j;
   import org.junit.Test;
   import java.io.IOException;
   import java.util.concurrent.TimeoutException;
   
   /**
    * RabbitMQ - 路由模式 消费者
    * @author Admin
    */
   @Slf4j
   public class ConsumerApplication01 {
       
       //交换机名称
       static final String DIRECT_EXCHANGE = "direct_exchange";
       //队列名称
       static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
       
       public static ConnectionFactory getFactory(){
           //设置连接工程
           ConnectionFactory factory = new ConnectionFactory();
           //设置 host
           factory.setHost("localhost");
           //设置 port 默认为5672
           factory.setPort(5672);
           //设置 virtualHost 
           factory.setVirtualHost("/");
           //设置 账号和密码
           factory.setUsername("username");
           factory.setPassword("password");
           return factory;
       }
   
         /**
          * 消费者
          */
       public static void main(String[] args) {
          try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();
             // 声明(创建)队列
             /**
              * 参数1:队列名称
              * 参数2:是否定义持久化队列
              * 参数3:是否独占本次连接
              * 参数4:是否在不使用的时候自动删除队列
              * 参数5:队列其它参数
              */
              channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
             
             //队列绑定交换机
             channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHANGE, "insert");

             //回调方法
             DeliverCallback deliverCallback = (consumerTag,delivery) -> {
                 //接受信息到队列
                 String message = new String(delivery.getBody(),"UTF-8");
                 log.info(" [√] 接受到消息:"+message);
             };
     
             channel.basicConsume(DIRECT_QUEUE_INSERT,true,deliverCallback,consumerTag ->{});
     
             //consumer不关闭channel 和 connection 因为要一直监视消息

          } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
          } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
          }
       }
   }

消费者02

   import com.rabbitmq.client.*;
   import lombok.extern.slf4j.Slf4j;
   import org.junit.Test;
   import java.io.IOException;
   import java.util.concurrent.TimeoutException;
   
   /**
    * RabbitMQ - 路由模式 消费者
    * @author Admin
    */
   @Slf4j
   public class ConsumerApplication02 {
       
       //交换机名称
       static final String DIRECT_EXCHANGE = "direct_exchange";
       //队列名称
       static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
       
       public static ConnectionFactory getFactory(){
           //设置连接工程
           ConnectionFactory factory = new ConnectionFactory();
           //设置 host
           factory.setHost("localhost");
           //设置 port 默认为5672
           factory.setPort(5672);
           //设置 virtualHost 
           factory.setVirtualHost("/");
           //设置 账号和密码
           factory.setUsername("username");
           factory.setPassword("password");
           return factory;
       }
   
         /**
          * 消费者
          */
       public static void main(String[] args) {
          try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();
             // 声明(创建)队列
             /**
              * 参数1:队列名称
              * 参数2:是否定义持久化队列
              * 参数3:是否独占本次连接
              * 参数4:是否在不使用的时候自动删除队列
              * 参数5:队列其它参数
              */
              channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
             
             //队列绑定交换机
             channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHANGE, "insert");

             //回调方法
             DeliverCallback deliverCallback = (consumerTag,delivery) -> {
                 //接受信息到队列
                 String message = new String(delivery.getBody(),"UTF-8");
                 log.info(" [√] 接受到消息:"+message);
             };
     
             channel.basicConsume(DIRECT_QUEUE_UPDATE,true,deliverCallback,consumerTag ->{});
     
             //consumer不关闭channel 和 connection 因为要一直监视消息

          } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
          } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
          }
       }
   }

6.测试

启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。

7.小结

Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。

五、Topic 通配符模式

知识点:
1. Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符
2. Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
3. 通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词

1 - 3步与简单模式一致

过去看看

4.编写生产者

4.1.在Producer项目里创建Java类文件

src/main/java/包名*/ProducerApplication.java

4.2.撸码(具体看注释)

 import com.rabbitmq.client.*;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.Test;
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
 /**
  * RabbitMQ - 发布订阅模式 生产者
  * @author Admin
  */
 @Slf4j
 public class ProducerApplication {
     
     //交换机名称
     static final String TOPIC_EXCHANGE = "topic_exchange";
     //队列名称
     static final String TOPIC_QUEUE_1 = "topic_queue_1";
     //队列名称
     static final String TOPIC_QUEUE_2 = "topic_queue_2";
     
     public static ConnectionFactory getFactory(){
         //设置连接工程
         ConnectionFactory factory = new ConnectionFactory();
         //设置 host
         factory.setHost("localhost");
         //设置 port 默认为5672
         factory.setPort(5672);
         //设置 virtualHost 
         factory.setVirtualHost("/");
         //设置 账号和密码
         factory.setUsername("username");
         factory.setPassword("password");
         return factory;
     }
 
     /**
      * 生产者
      */
     @Test
     public void testSend(){
         try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();
             /**
              * 声明交换机
              * 参数1:交换机名称
              * 参数2:交换机类型,fanout、topic、direct、headers
              */
             channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
     
             //发布信息到队列
             String insert = "路由模式发布消息 Routing为:item.insert";
             channel.basicPublish(TOPIC_EXCHANGE, "item.insert", null, insert.getBytes());
             String update = "路由模式发布消息 Routing为:item.update";
             channel.basicPublish(TOPIC_EXCHANGE, "item.update", null, update.getBytes());
             String update = "路由模式发布消息 Routing为:item.delete";
             channel.basicPublish(TOPIC_EXCHANGE, "item.delete", null, update.getBytes());

             log.debug("发送信息成功");
             //关闭资源
             channel.close();
             connection.close();
         } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
         } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
         }
     }
 }

5.编写消费者

5.1.在Consumer项目里创建Java类文件

src/main/java/包名*/ConsumerApplication01.java
src/main/java/包名*/ConsumerApplication02.java

5.2.撸码(重点看注解)

消费者01

   import com.rabbitmq.client.*;
   import lombok.extern.slf4j.Slf4j;
   import org.junit.Test;
   import java.io.IOException;
   import java.util.concurrent.TimeoutException;
   
   /**
    * RabbitMQ - 通配符模式 消费者
    * @author Admin
    */
   @Slf4j
   public class ConsumerApplication01 {
       
       //交换机名称
       static final String TOPIC_EXCHANGE = "topic_exchange";
       //队列名称
       static final String TOPIC_QUEUE_1 = "topic_queue_1";
       
       public static ConnectionFactory getFactory(){
           //设置连接工程
           ConnectionFactory factory = new ConnectionFactory();
           //设置 host
           factory.setHost("localhost");
           //设置 port 默认为5672
           factory.setPort(5672);
           //设置 virtualHost 
           factory.setVirtualHost("/");
           //设置 账号和密码
           factory.setUsername("username");
           factory.setPassword("password");
           return factory;
       }
   
         /**
          * 消费者
          */
       public static void main(String[] args) {
          try {
             //创建连接对象
             Connection connection = getFactory().newConnection();
             //创建信道对象
             Channel channel = connection.createChannel();
             // 声明(创建)队列
             /**
              * 参数1:队列名称
              * 参数2:是否定义持久化队列
              * 参数3:是否独占本次连接
              * 参数4:是否在不使用的时候自动删除队列
              * 参数5:队列其它参数
              */
              channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);
             
             //队列绑定交换机
             channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "item.update");
             channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "item.delete");

             //回调方法
             DeliverCallback deliverCallback = (consumerTag,delivery) -> {
                 //接受信息到队列
                 String message = new String(delivery.getBody(),"UTF-8");
                 log.info(" [√] 接受到消息:"+message);
             };
     
             channel.basicConsume(TOPIC_QUEUE_1,true,deliverCallback,consumerTag ->{});
     
             //consumer不关闭channel 和 connection 因为要一直监视消息

          } catch (IOException e) {
             log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
          } catch (TimeoutException e) {
             log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
          }
       }
   }

消费者02

  import com.rabbitmq.client.*;
  import lombok.extern.slf4j.Slf4j;
  import org.junit.Test;
  import java.io.IOException;
  import java.util.concurrent.TimeoutException;
  /**
   * RabbitMQ - 通配符模式 消费者
   * @author Admin
   */
  @Slf4j
  public class ConsumerApplication02 {
      
      //交换机名称
      static final String TOPIC_EXCHANGE = "topic_exchange";
      //队列名称
      static final String TOPIC_QUEUE_2 = "topic_queue_2";
      
      public static ConnectionFactory getFactory(){
          //设置连接工程
          ConnectionFactory factory = new ConnectionFactory();
          //设置 host
          factory.setHost("localhost");
          //设置 port 默认为5672
          factory.setPort(5672);
          //设置 virtualHost 
          factory.setVirtualHost("/");
          //设置 账号和密码
          factory.setUsername("username");
          factory.setPassword("password");
          return factory;
      }
  
        /**
         * 消费者
         */
      public static void main(String[] args) {
         try {
            //创建连接对象
            Connection connection = getFactory().newConnection();
            //创建信道对象
            Channel channel = connection.createChannel();
            // 声明(创建)队列
            /**
             * 参数1:队列名称
             * 参数2:是否定义持久化队列
             * 参数3:是否独占本次连接
             * 参数4:是否在不使用的时候自动删除队列
             * 参数5:队列其它参数
             */
             channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);
            
            //队列绑定交换机
            channel.queueBind(TOPIC_QUEUE_2, TOPIC_EXCHAGE, "item.update");
            channel.queueBind(TOPIC_QUEUE_2, TOPIC_EXCHAGE, "item.delete");

            //回调方法
            DeliverCallback deliverCallback = (consumerTag,delivery) -> {
                //接受信息到队列
                String message = new String(delivery.getBody(),"UTF-8");
                log.info(" [√] 接受到消息:"+message);
            };
    
            channel.basicConsume(TOPIC_QUEUE_2,true,deliverCallback,consumerTag ->{});
    
            //consumer不关闭channel 和 connection 因为要一直监视消息

         } catch (IOException e) {
            log.error("[IOException]发现错误 发送信息失败:{}", e.getMessage());
         } catch (TimeoutException e) {
            log.error("[TimeoutException]发现错误 发送信息失败:{}", e.getMessage());
         }
      }
  }

6.测试

启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。

7.小结

Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。

RabbitMQ工作模式总结

一、简单模式 HelloWorld

一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

二、工作队列模式 Work Queue

一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

三、发布订阅模式 Publish/subscribe

需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

四、路由模式 Routing

需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

五、通配符模式 Topic

需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

推荐阅读