首页 > 技术文章 > RabbitMQ

ruhuanxingyun 2020-05-04 10:53 原文

简介:RabbitMQ是一个开源的消息代理和队列服务器,通过普通的协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,基于AMQP(高级消息队列协议)的,分布式系统使用广泛;它集群模式丰富,有表达式配置、HA模式、镜像队列模型,保证数据不丢失的前提下做到高可靠性和可用性,支持主流的操作系统和多种语言开发;使用场景有:任务异步处理、应用程序解耦合、削峰填谷

一、 RabbitMQ整体架构

 

  生产者发送消息:

    A. 生产者创建连接,开启信道,连接到RabbitMQ Broker;

    B. 声明队列并设置属性,如:是否排他,是否持久化,是否自动删除;

    C. 将路由键与队列绑定起来;

    D. 发送消息至RabbitMQ Broker;

    E. 关闭信道;

    F. 关闭连接。

  消费者消费消息:

    A. 消费者创建连接,开启信道,连接到RabbitMQ Broker;

    B. 向Broker请求消费相应队列中的消息,设置Broker应的回调函数;

    C. 等待Broker回应关闭投递队列中的消息,消费者接收消息;

    D. 确认(ack,自动确认)接收到消息;

    E. RabbitMQ从队列中删除相应已经被确认的消息;

    F. 关闭信道;

    G. 关闭连接。

 

二、AMQP协议核心概念

  A. Server:又称为Broker,接受客户端(生产者和消费者)的连接,实现AMQP实体服务;

  B. Connection:应用程序与Server的网络连接,比如TCP/IP套接字连接;

  C. Channel:网络信道,多路复用连接中的一条独立的双向数据流通道,几乎所有的操作都在Channel中进行。客户端可以建立多个Channel,每个Channel代表一个会话任务,对于操作系统来说,建立和销毁TCP连接都是非常昂贵的开销;

  D. Message:消息,应用程序和服务器之间传递的消息;

  E. Virtual Host:虚拟主机,用于进行逻辑隔离,是最上层的消息路由,标识一批交换机、消息队列和相关对象,一个Virtual Host可以有若干个Exchange和Queue,默认/;

  F. Exchange:交换机,接收消息,按照路由规则将消息路由到一个或多个队列;

  G. Binding:绑定,交换器和消息队列中的虚拟连接,可以包含RoutingKey;

  H. Queue:消息队列,用来保存消息,供消费者消费;

  I. RoutingKey:路由键,生产者将消息发送给交换器的时候,RoutingKey是用来指定路由规则,这样对应发送给消息队列。

 

三、RabbitMQ六种模式

  1. 简单模式HelloWord

    A. 生产者和消费者只有一个,不需要设置交换机(使用默认的交换机)。

  2. 工作队列模式Work Queue

    A. 一个生产者,多个消费者(竞争关系),不需要设置交换机(使用默认的交换机);

    B. 应用场景:对于任务过重或任务较多情况,使用工作队列可以提供任务处理速度。 

  3. 发布与订阅模式Publish/Subscirbe

    A. 需要设置类型为fanout的交换机,并且交换机和队列绑定,当发送消息到交换机后,交换机会将消息发送给队列;

    B. 生产者将消息发给broker,交换机将消息转发到绑定此交换机的每个队列,每个消费者监听自己的队列;

    C. 该模式转发消息是最快的。

  4. 路由模式Routing

    A. 需要设置类型为direct的交换机,并且交换机和队列绑定,并且指定Routingkey,当发送消息到交换机后,交换机根据Routingkey会将消息发送给对应的队列;

    B. 只有队列的Routingkey与消息的Routingkey完全一致,才会接收到消息。

  5. 通配符模式Topic

    A. 需要设置类型为topic的交换机,并且交换机和队列绑定,并且指定通配符方式的Routingkey,当发送消息到交换机后,交换机根据Routingkey会将消息发送给对应的队列;

    B. Routingkey一般是由一个或对个单词组成,多个单词之间以"."分隔,通配符#匹配一个或多个单词,通配符*只能匹配一个词。

  6. RPC远程调用模式

 

 四、过期时间TTL

  1. 定义:过期时间TTL表示可以对消息设置预期的时间,在这个时间内可以被消费者接收消费,过了之后消息将被自动删除,消息会被投送到死信队列;

  2. 设置方式

    A.  通过队列属性设置,队列中所有的消息都有相同的过期时间;

    B.  对消息单独设置,每条消息TTL可以不同;

  3. 代码

 

五、死信队列DLX

  1. 定义:死信队列DLX表示当一个消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就是死信队列

  2. 原因

    A. 原因:消息变成死信,肯能是消息被拒绝,消息过期,队列达到最大长度;

    B. 分类:

  3. 代码

 

六、延迟队列

  1. 定义:延迟队列存储的对象是对应的延迟消息,所谓延迟消息是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能够拿到这个消息进行消费。

  2. 实现方式

    A. 通过过期时间 + 死信队列来实现;

    B. 使用RabbitMQ-delayed-message-exchange插件实现延迟功能

  3. 应用场景

    A. 在电商项目中的场景:支付订单半小时没支付就异常处理等;

    B. 指定的时间之后执行任务:设备升级超时处理等。

  可参考Redis实现延迟队列

 

七、消息确认机制

  1. 定义:消息确认机制是确认并且保证消息被送达;

  2. 实现方式

    A. AMQP事务机制:txSelect()、txCommit()和txRollback()三个方法,业务的处理伴随消息的发送,业务处理失败后要求不发送,缺点是会严重降低RabbitMQ的消息吞吐量;

    B. 确认(confirm)模式:分生产端确认(发送确认回调publisher-confirms和失败返回回调publisher-returns)和消费端确认(自动确认auto和手动确认manual);

  3. 确认模式 

    A. 生产端确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调;

    B. 消费端确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费,因此若我们增加手动确认,则需要代码中明确进行消息确认;

  4. 手动确认

    A. 成功确认:basicAck();

    B. 拒绝多条消息:basicNack();

    C. 拒绝一条消息:basicReject();

    D. 对于失败确认的消息放回队列,若一直异常失败导致进入死循环的解决办法:根据异常类型来选择是否重新放入队列,或是先成功确认然后通过channel.basicPublish()重新发布这个消息。

  注意:当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部

 

八、消息追踪

  1. 消息追踪是使用Trace实现 ,记录RabbitMQ每一次发送的消息,方便开发者调试和排错,通过插件提供可视化界面;

  2. 先启用RabbitMQ插件,再打开开关,使用相关命令操作,rabbitmq-plugins enable rabbitmq_tracing  rabbitmqct trace_on等。

 

九、消息(Message)

  A. 消息:是服务器和应用程序之间传递的数据,由消息头Properties和消息体Body组成,而消息头存放消息的路由键、优先级和延迟等特性,消息体存放内容;

  B. AmqpHeaders API

package org.springframework.amqp.support;

public abstract class AmqpHeaders {

    /**
     * 应用程序ID
     */
    public static final String APP_ID = "amqp_appId";

    /**
     * 集群ID
     */
    public static final String CLUSTER_ID = "amqp_clusterId";

    /**
     * 消息内容的编码格式
     */
    public static final String CONTENT_ENCODING = "amqp_contentEncoding";

    /**
     * 消息内容的类型
     */
    public static final String CONTENT_TYPE = "contentType";

    /**
     * 关联id
     */
    public static final String CORRELATION_ID = "amqp_correlationId";

    /**
     * 投递模式(是否持久化)
     */
    public static final String DELIVERY_MODE = "amqp_deliveryMode";

    /**
     * 消息的失效时间
     */
    public static final String EXPIRATION = "amqp_expiration";

    /**
     * 消息的ID
     */
    public static final String MESSAGE_ID = "amqp_messageId";

    /**
     * 用于指定回复的队列的名称
     */
    public static final String REPLY_TO = "amqp_replyTo";

    /**
     * 消息的时间戳
     */
    public static final String TIMESTAMP = "amqp_timestamp";

    /**
     * 类型
     */
    public static final String TYPE = "amqp_type";

    /**
     * 用户ID
     */
    public static final String USER_ID = "amqp_userId";

}
View Code

 

十、Chanel API

package com.rabbitmq.client;

import com.rabbitmq.client.ShutdownNotifier;

import java.io.IOException;

public interface Channel extends ShutdownNotifier, AutoCloseable {

    /**
     * 发布一个消息
     *
     * @param exchange   名称
     * @param routingKey 路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
     * @param mandatory  为true时如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。为false时出现上述情形broker会直接将消息扔掉
     * @param immediate  为true时如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者
     * @param props      需要注意的是BasicProperties.deliveryMode,1:不持久化 2:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
     * @param body       要发送的信息
     * @throws IOException IO异常
     */
    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;

    /**
     * 确认收到的一个或多个消息
     *
     * @param deliveryTag 当前消息的类似编号的号码,服务端为每一个消息生成的类似编号的号码
     * @param multiple    是否一次性把小于deliveryTag的消息确认
     * @throws IOException IO异常
     */
    void basicAck(long deliveryTag, boolean multiple) throws IOException;

    /**
     * 拒绝收到的一条或多条消息
     *
     * @param deliveryTag 当前消息的类似编号的号码,服务端为每一个消息生成的类似编号的号码
     * @param multiple    是否一次性把小于deliveryTag的消息确认
     * @param requeue     被拒绝的是否重新入队列 注意:如果设置为true ,则会添加在队列的末端
     * @throws IOException IO异常
     */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

    /**
     * 拒绝收到的一条消息
     *
     * @param deliveryTag 当前消息的类似编号的号码,服务端为每一个消息生成的类似编号的号码
     * @param requeue     被拒绝的是否重新入队列 注意:如果设置为true ,则会添加在队列的末端
     * @throws IOException IO异常
     */
    void basicReject(long deliveryTag, boolean requeue) throws IOException;
    
}
View Code

 

推荐阅读