首页 > 技术文章 > RabbitMQ消息中间件极速入门与实战

zyy1688 2019-01-21 14:16 原文

1:初识RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。

2:AMQP协议模型:

3:RabbitMQ的整体架构

4:RabbitMQ核心概念

    • Server:又称Broker,接受客户端的连接,实现AMQP实体服务
    • Connection:连接,应用程序与Broker的网络连接
    • Channel:网络信道

      几乎所有的操作都在Channel中进行
      Channel是进行消息读写的通道
      客户端可建立多个Channel
      每个Channel代表一个会话任务
    • Message:消息

      服务器和应用程序之间传送的数据,由Properties和Body组成
      Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性
      Body则就是消息体内容
    • Virtual host:虚拟机

      用于进行逻辑隔离,最上层的消息路由
      一个Virtual host里面可以有若干个Exchange和Queue
      同一个Virtual host里面不能有相同名称的Exchange或Queue
    • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
    • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
    • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
    • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

5:RabbitMQ消息的流转过程

6:引入依赖 以及生产端的相关配置

 <!--RabbitMQ依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

# RabbitMQ配置
spring.rabbitmq.addresses=192.168.0.105:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# Server配置
server.servlet.context-path=/
server.port=8080

spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

7: 订单对象

public class Order implements Serializable{

    private static final long serialVersionUID = 6771608755338249746L;

    private String id;

    private String name;
    /**
     * 存储消息发送的唯一标识
     */
    private String messageId;
}

8:生产端向消息队列发送消息

private RabbitTemplate rabbitTemplate;

@Autowired
public OrderSender(
RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send(Order order) throws Exception {

CorrelationData correlationData = new CorrelationData();
correlationData.setId(order.getMessageId());

// exchange:交换机
// routingKey:路由键
// message:消息体内容
// correlationData:消息唯一ID
this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData);
}

Order order = new Order();
order.setId("201809062009010001");
order.setName("测试订单1");
order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString().replaceAll("-",""));
this.orderSender.send(order);

9:消费端相关配置

# RabbitMQ连接配置
spring.rabbitmq.addresses=192.168.0.105:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
# RabbitMQ消费配置
# 基本并发:5
spring.rabbitmq.listener.simple.concurrency=5
# 最大并发:10
spring.rabbitmq.listener.simple.max-concurrency=10
# 签收模式:手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 限流策略:同一时间只有1条消息发送过来消费
spring.rabbitmq.listener.simple.prefetch=1

# Server配置
server.servlet.context-path=/
server.port=8082

spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

10:消费端消费消息

@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理

@Component
public class OrderReceiver {

/**
* 接收消息
*
* @param order 消息体内容
* @param headers 消息头内容
* @param channel 网络信道
* @throws Exception 异常
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true"),
exchange = @Exchange(name = "order-exchange",type = "topic"),
key = "order.*"
))
@RabbitHandler
public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
// 消费者操作
System.out.println("收到消息:");
System.out.println("订单信息:" + order.toString());

// 手动签收消息
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}

 

11:保证100%消息投递

 

 12:mybatis相关依赖

<!--MyBatis依赖-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.24</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.github.miemiedev</groupId>
<artifactId>mybatis-paginator</artifactId>
<version>1.2.17</version>
<exclusions>
<exclusion>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
</exclusion>
</exclusions>
</dependency>

13:rabbitmq采用消息确认模式
# 采用消息确认模式
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

14:mysql相关配置
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.username=root
spring.datasource.password=root

mybatis.type-aliases-package=com.myimooc.rabbitmq.ha.dao.mapper
mybatis.mapper-locations=classpath:com/  /mapper/*.xml


数据源druid.properties配置
##下面为连接池的补充设置,应用到上面所有数据源中
# 初始化大小、最小、最大
druid.initialSize=1
druid.minIdle=1
druid.maxActive=300
# 获取连接等待超时时间(单位:毫秒)
druid.maxWait=60000
# 间隔多久进行一次检测,检测需要关闭的空闲连接(单位:毫秒)
druid.timeBetweenEvictionRunsMillis=60000
# 一个连接在池中最小生存的时间(单位:毫秒)
druid.minEvictableIdleTimeMillis=300000
druid.validationQuery=SELECT 1 FROM DUAL
druid.testWhileIdle=true
druid.testOnBorrow=false
druid.testOnReturn=false
# 打开PSCache,并且指定每个连接上PSCache的大小
druid.poolPreparedStatements=true
druid.maxOpenPreparedStatements=20
# 监控统计拦截的filters,去掉后监控界面sql无法统计,wall用于防火墙
druid.filters=stat,wall,log4j
# 通过connectionProperties属性来打开mergeSQL功能:慢SQL记录
druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
# 合并多个DruidDataSource的监控数据
druid.useGlobalDataSourceStat=true


DruidDataSourceSettings 类
@Component
@PropertySource("classpath:druid.properties")
public class DruidDataSourceSettings {

@Value("${spring.datasource.driver-class-name}")
private String driverClassName;
@Value("${spring.datasource.url}")
private String url;
@Value("${spring.datasource.username}")
private String username;
@Value("${spring.datasource.password}")
private String password;

@Value("${druid.initialSize}")
private Integer initialSize;
@Value("${druid.minIdle}")
private Integer minIdle;
@Value("${druid.maxActive}")
private Integer maxActive;
@Value("${druid.maxWait}")
private Long maxWait;

@Value("${druid.timeBetweenEvictionRunsMillis}")
private Long timeBetweenEvictionRunsMillis;

@Value("${druid.minEvictableIdleTimeMillis}")
private Long minEvictableIdleTimeMillis;
@Value("${druid.validationQuery}")
private String validationQuery;
@Value("${druid.testWhileIdle}")
private Boolean testWhileIdle;
@Value("${druid.testOnBorrow}")
private Boolean testOnBorrow;
@Value("${druid.testOnReturn}")
private Boolean testOnReturn;

@Value("${druid.poolPreparedStatements}")
private Boolean poolPreparedStatements;
@Value("${druid.maxOpenPreparedStatements}")
private Integer maxOpenPreparedStatements;

@Value("${druid.filters}")
private String filters;
@Value("${druid.connectionProperties}")
private String connectionProperties;
@Value("${druid.useGlobalDataSourceStat}")
private Boolean useGlobalDataSourceStat;

public String getDriverClassName() {
return driverClassName;
}

public String getUrl() {
return url;
}

public String getUsername() {
return username;
}

public String getPassword() {
return password;
}

public Integer getInitialSize() {
return initialSize;
}

public Integer getMinIdle() {
return minIdle;
}

public Integer getMaxActive() {
return maxActive;
}

public Long getMaxWait() {
return maxWait;
}

public Long getTimeBetweenEvictionRunsMillis() {
return timeBetweenEvictionRunsMillis;
}

public Long getMinEvictableIdleTimeMillis() {
return minEvictableIdleTimeMillis;
}

public String getValidationQuery() {
return validationQuery;
}

public Boolean getTestWhileIdle() {
return testWhileIdle;
}

public Boolean getTestOnBorrow() {
return testOnBorrow;
}

public Boolean getTestOnReturn() {
return testOnReturn;
}

public Boolean getPoolPreparedStatements() {
return poolPreparedStatements;
}

public Integer getMaxOpenPreparedStatements() {
return maxOpenPreparedStatements;
}

public String getFilters() {
return filters;
}

public Properties getConnectionProperties() {
Properties properties = new Properties();
String[] entrys = connectionProperties.split(";");
for (String entry : entrys) {
String[] split = entry.split("=");
properties.setProperty(split[0],split[1]);
}
return properties;
}

public Boolean getUseGlobalDataSourceStat() {
return useGlobalDataSourceStat;
}
}

DruidDataSourceConfig 类
@Configuration
@EnableTransactionManagement
public class DruidDataSourceConfig {

@Bean
public DataSource dataSource(DruidDataSourceSettings druidSettings) throws SQLException {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setDriverClassName(druidSettings.getDriverClassName());
dataSource.setUrl(druidSettings.getUrl());
dataSource.setUsername(druidSettings.getUsername());
dataSource.setPassword(druidSettings.getPassword());

dataSource.setInitialSize(druidSettings.getInitialSize());
dataSource.setMinIdle(druidSettings.getMinIdle());
dataSource.setMaxActive(druidSettings.getMaxActive());
dataSource.setTimeBetweenEvictionRunsMillis(druidSettings.getTimeBetweenEvictionRunsMillis());
dataSource.setMinEvictableIdleTimeMillis(druidSettings.getMinEvictableIdleTimeMillis());
dataSource.setValidationQuery(druidSettings.getValidationQuery());

dataSource.setTestWhileIdle(druidSettings.getTestWhileIdle());
dataSource.setTestOnBorrow(druidSettings.getTestOnBorrow());
dataSource.setTestOnReturn(druidSettings.getTestOnReturn());
dataSource.setPoolPreparedStatements(druidSettings.getPoolPreparedStatements());
dataSource.setMaxPoolPreparedStatementPerConnectionSize(druidSettings.getMaxOpenPreparedStatements());

dataSource.setFilters(druidSettings.getFilters());
dataSource.setConnectProperties(druidSettings.getConnectionProperties());
return dataSource;
}

@Bean
public PlatformTransactionManager transactionManager(DruidDataSourceSettings druidSettings) throws Exception {
DataSourceTransactionManager manager = new DataSourceTransactionManager();
manager.setDataSource(this.dataSource(druidSettings));
return manager;
}
}
MybatisDataSourceConfig 类
@Configuration
public class MybatisDataSourceConfig {

@Autowired
private DataSource dataSource;

@Bean(name="sqlSessionFactory")
public SqlSessionFactory sqlSessionFactoryBean() {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
// 添加XML目录
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
try {
bean.setMapperLocations(resolver.getResources("classpath:com/bfxy/springboot/mapping/*.xml"));
SqlSessionFactory sqlSessionFactory = bean.getObject();
sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);

return sqlSessionFactory;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Bean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}

}
MybatisMapperScanerConfig 类
@Configuration
@AutoConfigureAfter(MybatisDataSourceConfig.class)
public class MybatisMapperScanerConfig {

@Bean
public MapperScannerConfigurer mapperScannerConfigurer() {
MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
mapperScannerConfigurer.setSqlSessionFactoryBeanName("sqlSessionFactory");
mapperScannerConfigurer.setBasePackage("com.bfxy.springboot.mapper");
return mapperScannerConfigurer;
}

}



15:消息发送者确认机制
@Component
public class OrderSender {

@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private BrokerMessageLogMapper brokerMessageLogMapper;
/**
* 回调方法:confirm确认
*/
private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationData:" + correlationData);
String messageId = correlationData.getId();
if (ack) {
// 如果confirm返回成功,则进行更新
BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO();
messageLogPO.setMessageId(messageId);
messageLogPO.setStatus(Constants.OrderSendStatus.SEND_SUCCESS);
brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO);
} else {
// 失败则进行具体的后续操作:重试或者补偿等
System.out.println("异常处理...");
}
}
};

/**
* 发送订单
*
* @param order 订单
*/
public void send(Order order) {
// 设置回调方法
this.rabbitTemplate.setConfirmCallback(confirmCallback);
// 消息ID
CorrelationData correlationData = new CorrelationData(order.getMessageId());
// 发送消息
this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData);
}


}
16:FastJson工具类
需要引入
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.36</version>
</dependency>
public class FastJsonConvertUtils {

private static final Logger logger = LoggerFactory.getLogger(FastJsonConvertUtils.class);

/**
* 将对象转为JSON字符串
*
* @param obj 任意对象
* @return JSON字符串
*/
public static String convertObjectToJson(Object obj) {
try {
return JSON.toJSONString(obj);
} catch (Exception ex) {
logger.warn("将对象转为JSON字符串异常:" + ex);
throw new RuntimeException("将对象转为JSON字符串异常:" + ex.getMessage(), ex);
}
}

/**
* 将JSON字符串转为对象
*
* @param message JSON字符串
* @param type 对象
* @param <T> 对象
* @return 对象实例
*/
public static <T> T convertJsonToObject(String message, Class<T> type) {
try {
return JSONObject.parseObject(message, type);
} catch (Exception ex) {
logger.warn("将JSON字符串转为对象异常:" + ex);
throw new RuntimeException("将JSON字符串转为对象异常:" + ex.getMessage(), ex);
}
}

}
17:
<!--工具类依赖-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
DateUtils.addMinutes(orderTime, 1)

18:定时任务
启用任务
@SpringBootApplication
@MapperScan("com.myimooc.rabbitmq.ha.dao.mapper")
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}

配置定时任务
@Configuration
@EnableScheduling
public class TaskSchedulerConfig implements SchedulingConfigurer {

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskScheduler());
}

@Bean(destroyMethod="shutdown")
public Executor taskScheduler(){
return Executors.newScheduledThreadPool(100);
}

}

定时任务相关执行代码
@Component
public class RetryMessageTask {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private OrderSender orderSender;
@Autowired
private BrokerMessageLogMapper brokerMessageLogMapper;

/**
* 启动完成3秒后开始执行,每隔10秒执行一次
*/
@Scheduled(initialDelay = 3000, fixedDelay = 10000)
public void retrySend() {
logger.debug("重发消息定时任务开始");
// 查询 status = 0 和 timeout 的消息日志
List<BrokerMessageLogPO> pos = this.brokerMessageLogMapper.listSendFailureAndTimeoutMessage();
for (BrokerMessageLogPO po : pos) {
logger.debug("处理消息日志:{}",po);
if (po.getTryCount() >= Constants.MAX_RETRY_COUNT) {
// 更新状态为失败
BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO();
messageLogPO.setMessageId(po.getMessageId());
messageLogPO.setStatus(Constants.OrderSendStatus.SEND_FAILURE);
this.brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO);
} else {
// 进行重试,重试次数+1
this.brokerMessageLogMapper.updateRetryCount(po);
Order reSendOrder = FastJsonConvertUtils.convertJsonToObject(po.getMessage(), Order.class);
try {
this.orderSender.send(reSendOrder);
} catch (Exception ex) {
// 异常处理
logger.error("消息发送异常:{}", ex);
}
}
}
logger.debug("重发消息定时任务结束");
}
}

推荐阅读