首页 > 技术文章 > 【RabbitMQ学习之二】RabbitMQ四种交换机模式应用

cac2020 2019-10-12 16:37 原文

环境
  win7
  rabbitmq-server-3.7.17
  Erlang 22.1

 

一、概念
1、队列
队列用于临时存储消息和转发消息。
队列类型有两种,即时队列和延时队列。
  即时队列:队列中的消息会被立即消费;
  延时队列:队列中的消息会在指定的时间延时之后被消费。

2、交换机
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。

交换机有四种类型:Direct, topic, Headers and Fanout。
Direct[精确匹配类型]:Direct是RabbitMQ默认的交换机模式,先匹配, 再投送。即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.

Topic[模式匹配]:按通配符匹配规则转发消息(最灵活),队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.

Headers[键值对匹配]:设置header attribute参数类型的交换机。
消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.

Fanout[转发消息最快]:
路由广播的形式,简单的将队列绑定到交换机上将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.

3、使用spring boot和rabbitmq整合   搭建演示工程

二、Direct Exchange-Work模式

配置类:

package com.wjy.direct;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//@Configuration这个注解是必须的,保证在基本类实例化之前该类已经被实例化
@Configuration
public class RabbitConfig {

    /**
    * @Desc:  配置一个消息队列(routingKey=q_hello)
    */
    @Bean
    public Queue queue() {
        return new Queue("q_hello");
    }

    /**
     * @Desc:  配置一个消息队列(routingKey=notify.refund)
     */
    @Bean
    public Queue refundNotifyQueue() {
        return new Queue("notify.refund");
    }

    /**
     * @Desc:  配置一个消息队列(routingKey=query.order) 测试RPC
     */
    @Bean
    public Queue queryOrderQueue() {
        return new Queue("query.order");
    }
}

 

生产者:

package com.wjy.direct;


import com.wjy.mojo.Order;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
* @Desc: 生产者
*/
@Component
public class MqSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
    * @Desc: 将消息发送至默认的交换机且routingKey为q_hello
    */
    public void send() {
        //24小时制
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        String context = "hello " + date;
        System.err.println("Sender : " + context);
        //简单对列的情况下routingKey即为Q名
        this.rabbitTemplate.convertAndSend("q_hello", context);
    }

    /**
     * @Desc: 将消息发送至默认的交换机且routingKey为q_hello
     */
    public void send(String i) {
        //24小时制
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        String context = "hello " + i + " " + date;
        System.err.println("Sender : " + context);
        //简单对列的情况下routingKey即为Q名
        this.rabbitTemplate.convertAndSend("q_hello", context);
    }

    /**
     * @Desc: 将发送对象
     */
    public void sender(Order order){
        System.err.println("notify.refund send message: "+order);
        rabbitTemplate.convertAndSend("notify.refund", order);
    }

    /**
     * @Desc: 测试RPC
     */
    public void sender(String orderId){
        System.err.println("query.order send message: "+orderId);
        Order order = (Order) rabbitTemplate.convertSendAndReceive("query.order", orderId);
        System.err.println("query.order return message: "+order);
    }
}
View Code

消费者:

package com.wjy.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @Desc:  消费者
*/
@Component
@RabbitListener(queues = "q_hello")
public class Receiver {

    /**
    * @Desc: 监听routingKey为nq_hello的队列消息
    */
    @RabbitHandler
    public void process(String hello) {
        System.err.println("Receiver1  : " + hello);
    }
}
package com.wjy.direct;

import com.wjy.mojo.Order;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "notify.refund")
public class RefundNotifyReceive {
    @RabbitHandler
    public void receive(Order order) {
        System.err.println("notify.refund receive message: "+order);
    }
}
View Code
package com.wjy.direct;

import com.wjy.mojo.Order;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.util.Date;

@Component
@RabbitListener(queues = "query.order")
public class QueryOrderReceive {
    @RabbitHandler
    public Order receive(String orderId) {
        System.err.println("notify.refund receive message: "+orderId);

        Order order = new Order();
        order.setId(100001);
        order.setOrderId(orderId);
        order.setAmount(new BigDecimal("2999.99"));
        order.setCreateTime(new Date());
        return order;
    }
}
View Code

测试类:

package com.wjy.direct;

import com.wjy.mojo.Order;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.math.BigDecimal;
import java.util.Date;

/**
* @Desc: 测试类
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class DirectExchangeTest {
    @Autowired
    private MqSender mqSender;

    @Test
    public void hello() throws Exception {
        mqSender.send();
    }

    /**
    * @Desc:  一对多
    * 应用场景:系统通常会做集群、分布式或灾备部署
    */
    @Test
    public void oneToMany() throws Exception {
        for (int i=0;i<100;i++){
            mqSender.send(i+"");
            Thread.sleep(200);
        }
    }

    /**
     * @Desc:  多对一 请求参数为偶数
     * 应用场景:系统通常会做集群、分布式或灾备部署
     */
    @Test
    public void test_sender_many2one_1() throws Exception {
        for (int i = 0; i < 20; i+=2) {
            mqSender.send("支付订单号:"+i);
            Thread.sleep(1000);
        }
    }

    /**
     * @Desc:  多对一 请求参数为奇数
     * 应用场景:系统通常会做集群、分布式或灾备部署
     */
    @Test
    public void test_sender_many2one_2() throws Exception {
        for (int i = 1; i < 20; i+=2) {
            mqSender.send("支付订单号:"+i);
            Thread.sleep(1000);
        }
    }


    /**
     * @Desc:  测试发送对象
     */
    @Test
    public void test_sender() {
        Order order = new Order();
        order.setId(100001);
        order.setOrderId(String.valueOf(System.currentTimeMillis()));
        order.setAmount(new BigDecimal("1999.99"));
        order.setCreateTime(new Date());
        mqSender.sender(order);
    }

    /**
     * @Desc:  测试RPC
     * RabbitMQ支持RPC远程调用,同步返回结果。
     * 虽然RabbitMQ支持RPC接口调用,但不推荐使用
     * 原因:
     * 1)RPC默认为单线程阻塞模型,效率极低。
     * 2)需要手动实现多线程消费。
     */
    @Test
    public void test_rpc() {
        mqSender.sender("900000001");
    }
}
View Code

 

三、Topic Exchange-主题模式


符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词.

配置类:

package com.wjy.topic;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicRabbitConfig {

    final static String message = "api.core";
    final static String messages = "api.payment";

    /**
     * 配置一个routingKey为api.core的消息队列
     */
    @Bean
    public Queue coreQueue() {
        return new Queue(TopicRabbitConfig.message);
    }

    /**
    * @Desc: 配置一个routingKey为api.payment的消息队列
    */
    @Bean
    public Queue paymentQueue() {
        return new Queue(TopicRabbitConfig.messages);
    }

    /**
     * @Desc: coreExchange交换机
     */
    @Bean
    public TopicExchange coreExchange() {
        return new TopicExchange("coreExchange");
    }

    /**
     * @Desc: paymentExchange交换机
     */
    @Bean
    public TopicExchange paymentExchange() {
        return new TopicExchange("paymentExchange");
    }

    /**
     * 配置一个routingKey为api.core的消息队列并绑定在coreExchange交换机上(交换机的匹配规则为api.core.*)
     */
    @Bean
    public Binding bindingCoreExchange(Queue coreQueue, TopicExchange coreExchange) {
        return BindingBuilder.bind(coreQueue).to(coreExchange).with("api.core.*");
    }

    /**
     * @Desc: 配置一个routingKey为api.payment的消息队列并绑定在paymentExchange交换机上(交换机的匹配规则为api.payment.#)
     */
    @Bean
    public Binding bindingPaymentExchange(Queue paymentQueue, TopicExchange paymentExchange) {
        return BindingBuilder.bind(paymentQueue).to(paymentExchange).with("api.payment.#");
    }
}

 

生产者:

package com.wjy.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ApiCoreSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
    * @Desc: 发送消息至coreExchange交换机且routingKey为api.core.user
    */
    public void user(String msg){
        System.err.println("api.core.user send message: "+msg);
        rabbitTemplate.convertAndSend("coreExchange", "api.core.user", msg);
    }

    /**
     * @Desc: 发送消息至coreExchange交换机且routingKey为api.core.user.query
     */
    public void userQuery(String msg){
        System.err.println("api.core.user.query send message: "+msg);
        rabbitTemplate.convertAndSend("coreExchange", "api.core.user.query", msg);
    }
}
View Code
package com.wjy.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ApiPaymentSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
    * @Desc: 添加一个order()方法,发送消息至paymentExchange交换机且routingKey为api.payment.order
    */
    public void order(String msg){
        System.err.println("api.payment.order send message: "+msg);
        rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order", msg);
    }

    /**
     * @Desc: 添加一个orderQuery()方法,发送消息至paymentExchange交换机且routingKey为api.payment.order.query
     */
    public void orderQuery(String msg){
        System.err.println("api.payment.order.query send message: "+msg);
        rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.query", msg);
    }

    /**
     * @Desc: 添加一个orderDetailQuery()方法,发送消息至paymentExchange交换机且routingKey为api.payment.order.detail.query
     */
    public void orderDetailQuery(String msg){
        System.err.println("api.payment.order.detail.query send message: "+msg);
        rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.detail.query", msg);
    }
}
View Code

消费者:

package com.wjy.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ApiCoreReceive {
    @RabbitHandler
    @RabbitListener(queues = "api.core")
    public void handle(String msg) {
        System.err.println("api.core receive message: "+msg);
    }
}
View Code
package com.wjy.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ApiPaymentReceive {
    @RabbitHandler
    @RabbitListener(queues = "api.payment")
    public void handle(String msg) {
        System.err.println("api.payment.order receive message: "+msg);
    }
}
View Code

测试类:

package com.wjy.topic;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiCoreSenderTests {
    @Autowired
    private ApiCoreSender sender;

    @Test
    public void test_user() {
        sender.user("用户管理!");
    }

    @Test
    public void test_userQuery() {
        sender.userQuery("查询用户信息!");
    }
}
View Code
package com.wjy.topic;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiPaymentSenderTests {
    @Autowired
    private ApiPaymentSender sender;

    @Test
    public void test_order() {
        sender.order("订单管理!");
    }

    @Test
    public void test_orderQuery() {
        sender.orderQuery("查询订单信息!");
    }

    @Test
    public void test_orderDetailQuery() {
        sender.orderDetailQuery("查询订单详情信息!");
    }
}
View Code

四、Fanout Exchange-订阅模式

配置类:

package com.wjy.fanout;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {

    /**
    * @Desc:  配置一个routingKey为api.report.payment的消息队列
    */
    @Bean
    public Queue reportPaymentQueue() {
        return new Queue("api.report.payment");
    }

    /**
     * @Desc:  配置一个routingKey为api.report.refund的消息队列
     */
    @Bean
    public Queue reportRefundQueue() {
        return new Queue("api.report.refund");
    }

    /**
     * @Desc:  配置一个reportExchange交换机
     */
    @Bean
    public FanoutExchange reportExchange() {
        return new FanoutExchange("reportExchange");
    }


    /**
     * @Desc:  配置routingKey为api.report.payment的消息队列并绑定在reportExchange交换机上
     */
    @Bean
    public Binding bindingReportPaymentExchange(Queue reportPaymentQueue, FanoutExchange reportExchange) {
        return BindingBuilder.bind(reportPaymentQueue).to(reportExchange);
    }

    /**
     * @Desc:  配置routingKey为api.report.refund的消息队列并绑定在reportExchange交换机上
     */
    @Bean
    public Binding bindingReportRefundExchange(Queue reportRefundQueue, FanoutExchange reportExchange) {
        return BindingBuilder.bind(reportRefundQueue).to(reportExchange);
    }
}
View Code

 

生产者:

package com.wjy.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ApiReportSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void generateReports(String msg){
        System.err.println("api.generate.reports send message: "+msg);
        rabbitTemplate.convertAndSend("reportExchange", "api.generate.reports", msg);
    }
}
View Code

 

消费者:

package com.wjy.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ApiReportReceive {
    @RabbitHandler
    @RabbitListener(queues = "api.report.payment")
    public void payment(String msg) {
        System.err.println("api.report.payment receive message: "+msg);
    }

    @RabbitHandler
    @RabbitListener(queues = "api.report.refund")
    public void refund(String msg) {
        System.err.println("api.report.refund receive message: "+msg);
    }
}
View Code

 

测试类:

package com.wjy.fanout;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiReportSenderTests {
    @Autowired
    private ApiReportSender sender;

    @Test
    public void test_generateReports() {
        sender.generateReports("开始生成报表!");
    }
}
View Code

五、Headers Exchange

配置类:

package com.wjy.headers;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class HeadersConfig {
    /**
    * @Desc: 配置一个routingKey为credit.bank的消息队列
    */
    @Bean
    public Queue creditBankQueue() {
        return new Queue("credit.bank");
    }

    /**
     * @Desc: 配置一个routingKey为credit.finance的消息队列
     */
    @Bean
    public Queue creditFinanceQueue() {
        return new Queue("credit.finance");
    }

    /**
     * @Desc: 配置一个creditBankExchange交换机
     */
    @Bean
    public HeadersExchange creditBankExchange() {
        return new HeadersExchange("creditBankExchange");
    }

    /**
     * @Desc: 配置一个creditFinanceExchange交换机
     */
    @Bean
    public HeadersExchange creditFinanceExchange() {
        return new HeadersExchange("creditFinanceExchange");
    }

    /**
     * @Desc: 配置一个routingKey为credit.bank的消息队列并绑定在creditBankExchange交换机上
     */
    @Bean
    public Binding bindingCreditAExchange(Queue creditBankQueue, HeadersExchange creditBankExchange) {
        Map<String,Object> headerValues = new HashMap<>();
        headerValues.put("type", "cash");
        headerValues.put("aging", "fast");
        //whereall 完全匹配
        return BindingBuilder.bind(creditBankQueue).to(creditBankExchange).whereAll(headerValues).match();
    }

    /**
    * @Desc: 配置一个routingKey为credit.finance的消息队列并绑定在creditFinanceExchange交换机上
    */
    @Bean
    public Binding bindingCreditBExchange(Queue creditFinanceQueue, HeadersExchange creditFinanceExchange) {
        Map<String,Object> headerValues = new HashMap<>();
        headerValues.put("type", "cash");
        headerValues.put("aging", "fast");
        //whereany 其中一项匹配即可
        return BindingBuilder.bind(creditFinanceQueue).to(creditFinanceExchange).whereAny(headerValues).match();
    }
}
View Code

生产者:

package com.wjy.headers;


import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class ApiCreditSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void creditBank(Map<String, Object> head, String msg){
        System.err.println("credit.bank send message: "+msg);
        rabbitTemplate.convertAndSend("creditBankExchange", "credit.bank", getMessage(head, msg));
    }

    public void creditFinance(Map<String, Object> head, String msg){
        System.err.println("credit.finance send message: "+msg);
        rabbitTemplate.convertAndSend("creditFinanceExchange", "credit.finance", getMessage(head, msg));
    }

    private Message getMessage(Map<String, Object> head, Object msg){
        MessageProperties messageProperties = new MessageProperties();
        for (Map.Entry<String, Object> entry : head.entrySet()) {
            messageProperties.setHeader(entry.getKey(), entry.getValue());
        }
        MessageConverter messageConverter = new SimpleMessageConverter();
        return messageConverter.toMessage(msg, messageProperties);
    }
}
View Code

消费者:

package com.wjy.headers;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ApiCreditReceive {
    @RabbitHandler
    @RabbitListener(queues = "credit.bank")
    public void creditBank(String msg) {
        System.err.println("credit.bank receive message: "+msg);
    }

    @RabbitHandler
    @RabbitListener(queues = "credit.finance")
    public void creditFinance(String msg) {
        System.err.println("credit.finance receive message: "+msg);
    }
}
View Code

测试类:

package com.wjy.headers;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiCreditSenderTests {
    @Autowired
    private ApiCreditSender sender;

    @Test
    public void test_creditBank_type() {
        Map<String,Object> head = new HashMap<>();
        head.put("type", "cash");
        sender.creditBank(head, "银行授信(部分匹配)");
    }

    @Test
    public void test_creditBank_all() {
        Map<String,Object> head = new HashMap<>();
        head.put("type", "cash");
        head.put("aging", "fast");
        sender.creditBank(head, "银行授信(全部匹配)");
    }

    @Test
    public void test_creditFinance_type() {
        Map<String,Object> head = new HashMap<>();
        head.put("type", "cash");
        sender.creditFinance(head, "金融公司授信(部分匹配)");
    }

    @Test
    public void test_creditFinance_all() {
        Map<String,Object> head = new HashMap<>();
        head.put("type", "cash");
        head.put("aging", "fast");
        sender.creditFinance(head, "金融公司授信(全部匹配)");
    }
}
View Code

六、延时队列

配置类:

package com.wjy.delaymq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class QueueConfiguration {

    /**
     * @Desc:消息队列app.queue.hello
     */
    @Bean
    public Queue helloQueue() {
        Queue queue = new Queue("app.queue.hello", true, false, false);
        return queue;
    }

    /**
     * 默认即时消息交换机
     */
    @Bean("defaultDirectExchange")
    public DirectExchange defaultDirectExchange() {
        return new DirectExchange("default.direct.exchange", true, false);
    }

    /**
     * @Desc:消息队列app.queue.hello绑定到默认队列上
     * 交换机匹配规则:app.queue.hello
     */
    @Bean
    public Binding helloBinding() {
        return BindingBuilder.bind(helloQueue()).to(defaultDirectExchange()).with("app.queue.hello");
    }

    /**
     * 配置延迟消息死信队列
     */
    @Bean
    public Queue defaultDeadLetterQueue() {
        Map<String, Object> arguments = new HashMap<>();
        //设置交换机路由
        arguments.put("x-dead-letter-exchange", "default.direct.exchange");
        //设置转发队列名称
        arguments.put("x-dead-letter-routing-key", "default.repeat.trade.queue");
        Queue queue = new Queue("default.dead.letter.queue", true, false, false, arguments);
        return queue;
    }

    /**
    * @Desc:将延迟消息队列绑定到延迟交换机上
     * 交换机匹配规则:default.dead.letter.queue
    */
    @Bean
    public Binding defaultDeadLetterBinding() {
        Binding bind = BindingBuilder.bind(defaultDeadLetterQueue()).to(defaultDirectExchange()).with("default.dead.letter.queue");
        return bind;
    }


    /**
     * 配置转发消息队列default.repeat.trade.queue
     * @return
     */
    @Bean
    public Queue defaultRepeatTradeQueue() {
        Queue queue = new Queue("default.repeat.trade.queue", true, false, false);
        return queue;
    }

    /**
     * 转发队列和默认交换机的绑定;
     * 交换机匹配规则:default.repeat.trade.queue
     */
    @Bean
    public Binding defaultRepeatTradeBinding() {
        return BindingBuilder
                .bind(defaultRepeatTradeQueue())
                .to(defaultDirectExchange())
                .with("default.repeat.trade.queue");
    }


}
View Code

生产者:

package com.wjy.delaymq;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Sender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(QueueMessage message) {
        //即时消息
        if(message.getType() == 10){
            sendMessage(message.getExchange(),message.getQueueName(),message.getMessage());
        }
        //延时消息
        if(message.getType() == 20){
            sendTimeMessage(message);
        }
    }

    //发送即时消息;
    private void sendMessage(String exchange,String queueName,String msg){
        rabbitTemplate.convertAndSend(exchange,queueName, msg);
    }

    //发送延时消息;
    public void sendTimeMessage(QueueMessage message) {
        int seconds = message.getSeconds();
        // 直接发送,无需进入死信队列
        if(seconds <= 0){
            sendMessage(message.getExchange(),message.getQueueName(), message.getMessage());
        }else{
            //rabbit默认为毫秒级
            long times = seconds * 1000;
            //这里需要字符定义延时处理器;
            MessagePostProcessor processor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setExpiration(times + "");
                    return message;
                }
            };
            //注意传送的消息必须是字串串或者 字节或者实现序列化的对象
            //否则报错:Execution of Rabbit message listener failed
            //改完后将之前的队列数据清除 否则还会报错
            rabbitTemplate.convertAndSend("default.direct.exchange","default.dead.letter.queue", "转发消息", processor);
        }
    }

}
View Code

消费者:

package com.wjy.delaymq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "default.repeat.trade.queue")
public class TradeRecever {
    @Autowired
    private Sender sender;

    @RabbitHandler
    public void process(String content) {
        System.err.println("-----------延时结束--------------"+content);
        QueueMessage message = new QueueMessage("app.queue.hello", "转发消息...");
        message.setType(10);
        sender.send(message);
    }
}
View Code
package com.wjy.delaymq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "app.queue.hello")
public class HelloRecever {

    @RabbitHandler
    public void process(String content) {

        System.err.println("hello 接受消息:" + content);
    }
}

POJO:

package com.wjy.delaymq;

import java.io.Serializable;
import java.util.Date;

public class QueueMessage implements Serializable {
    private String exchange;

    private String queueName;

    private Integer type;

    private Integer group;

    private Date timestamp;

    private String message;

    private Integer status;

    private int retry = 0;

    private int maxRetry = 10;

    private int seconds = 1;

    public QueueMessage() {
        super();
    }

    public QueueMessage(String queueName, String message) {
        super();
        this.queueName = queueName;
        this.message = message;
        this.exchange = "default.direct.exchange";
        this.type = 10;
        this.group = 10;
        this.timestamp = new Date();
        this.status = 10;
    }

    public String getExchange() {
        return exchange;
    }

    public void setExchange(String exchange) {
        this.exchange = exchange;
    }

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public Integer getType() {
        return type;
    }

    public void setType(Integer type) {
        this.type = type;
    }

    public Integer getGroup() {
        return group;
    }

    public void setGroup(Integer group) {
        this.group = group;
    }

    public Date getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Date timestamp) {
        this.timestamp = timestamp;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    public int getRetry() {
        return retry;
    }

    public void setRetry(int retry) {
        this.retry = retry;
    }

    public int getMaxRetry() {
        return maxRetry;
    }

    public void setMaxRetry(int maxRetry) {
        this.maxRetry = maxRetry;
    }

    public int getSeconds() {
        return seconds;
    }

    public void setSeconds(int seconds) {
        this.seconds = seconds;
    }
}
View Code

测试类:

package com.wjy.delaymq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayTest {

    @Autowired
    private Sender sender;

    @Test
    public void delaySendTest() {
        System.err.println("发送延迟消息...");
        QueueMessage message = new QueueMessage("app.queue.hello", "测试延时消息...");
        //20代表延时消息队列;
        message.setType(20);
        //设置延时时间,单位为毫秒;
        message.setSeconds(6);
        sender.send(message);
        try {
            Thread.sleep(600000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
View Code

 

七、消息确认机制

配置类:

package com.wjy.ack;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AckRabbitConfig {


    /**
     * 定义一个hello的队列
     * Queue 可以有4个参数
     *      1.队列名
     *      2.durable       持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true
     *      3.auto-delete   表示消息队列没有在使用时将被自动删除 默认是false
     *      4.exclusive     表示该消息队列是否只在当前connection生效,默认是false
     */
    @Bean
    public Queue helloQueue() {
        return new Queue("queue-test");
    }

    /** ======================== 定制一些处理策略 =============================*/

     /**
     * Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("ABExchange");
    }


    @Bean
    public Binding bindingExchangeA(Queue helloQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(helloQueue).to(fanoutExchange);
    }

}
View Code

 

生产者:

package com.wjy.ack;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class Producer implements RabbitTemplate.ReturnCallback  {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 给hello队列发送消息
     */
    public void send() {
        String context = "你好现在是 " + new Date() +"";
        System.err.println("HelloSender发送内容 : " + context);
        this.rabbitTemplate.setReturnCallback(this);
        this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack) {
                System.out.println("HelloSender消息发送失败" + cause + correlationData.toString());
            } else {
                System.out.println("HelloSender 消息发送成功 ");
            }
        });

        rabbitTemplate.convertAndSend("queue-test", context);
    }

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.err.println("sender return success" + message.toString()+"==="+i+"==="+s1+"==="+s2);
    }
}
View Code

 

消费者:

package com.wjy.ack;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@RabbitListener(queues = "queue-test")
public class Comsumer {

    @RabbitHandler
    public void process(String msg,Message message, Channel channel) throws IOException {
        try {
            // 采用手动应答模式, 手动确认应答更为安全稳定
            /*channel.basicAck(deliveryTag,ack)
            deliveryTag-当前消息的类似编号的号码,服务端为每一个消息生成的类似编号的号码
            ack-false只确认当前一个消息收到,true确认所有consumer获得的消息
            */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            System.err.println("receive: " + new String(message.getBody()));
        }
        catch (Exception e){
            //丢弃这条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            //拒绝这条消息
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}
View Code

 

测试类:

package com.wjy.ack;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqAckTests {
    @Autowired
    private Producer producer;

    /**
    * @Desc: 测试之前需在application.yml开启消息确认的配置
    */
    @Test
    public void send() {
        producer.send();
    }
}
View Code

 

演示代码

参考:

springboot集成rabbitmq(实战)

延时队列

5种消息队列

RabbitMQ四种交换机类型介绍

推荐阅读