@
1.什么是中间件
我国企业从20世纪80年代开始就逐渐进行信息化建设,由于方法和体系的不成熟,以及企业业务和市场需求的不断变化,一个企业可能同时运行着多个不同的业务系统,这些系统可能基于不同的操作系统、不同的数据库、异构的网络环境。现在的问题是,如何把这些信息系统结合成一个有机地协同工作的整体,真正实现企业跨平台、分布式应用。中间件便是解决之道,它用自己的复杂换取了企业应用的简单。
中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件=平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。
2.为什么需要使用消息中间件
具体地说,中间件屏蔽了底层操作系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必再为程序在不同系统软件上的移植而重复工作,从而大大减少了技术上的负担。中间件带给应用系统的,不只是开发的简便、开发周期的缩短,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。
3.中间件特点
为解决分布异构问题,人们提出了中间件(middleware)的概念。中间件是位于平台(硬件和操作系统)和应用之间的通用服务,如下图所示,这些服务具有标准的程序接口和协议。针对不同的操作系统和硬件平台,它们可以有符合接口和协议规范的多种实现。
也许很难给中间件一个严格的定义,但中间件应具有如下的一些特点:
(1)满足大量应用的需要
(2)运行于多种硬件和OS平台
(3)支持分布计算,提供跨网络、硬件和OS平台的透明性的应用或服务的交互
(4)支持标准的协议
(5)支持标准的接口
简单说:中间件有个很大的特点,是脱离于具体设计目标,而具备提供普遍独立功能需求的模块。这使得中间件一定是可替换的。如果一个系统设计中,中间件是不可替换的,不是架构、框架设计有问题,那么就是这个中间件,在 别处可能是个中间件,在这个系统内是引擎。哈。
4.网络协议的三要素 (amqp协议)
1.语法。语法是用户数据与控制信息的结构与格式,以及数据出现的顺序。
2.语义。语义是解释控制信息每个部分的意义。它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应。
3.时序。时序是对事件发生顺序的详细说明。
面试题:为什么消息中间件不直接使用http协议呢?
1: 因为http请求报文头和响应报文头是比较复杂的,包含了cookie,数据的加密解密,状态码,响应码等附加的功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就行,一定要追求的是高性能。尽量简洁,快速。
2:大部分情况下http大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。
5.消息的分发策略
ActiveMQ | RabbitMQ | Kafka | RocketMQ | |
---|---|---|---|---|
发布订阅 | 支持 | 支持 | 支持 | 支持 |
轮询分发 | 支持 | 支持 | 支持 | / |
公平分发 | / | 支持 | 支持 | / |
重发 | 支持 | 支持 | / | 支持 |
消息拉取 | / | 支持 | 支持 | 支持 |
6.安装rabbitmq+erlang
relang安装
wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm //下载erlang
rpm -Uvh erlang-solutions-2.0-1.noarch.rpm //解压
yum install -y erlang //安装
erl -v /查看版本
rabbitmq安装
docker安装
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
docker exec -it 镜像ID /bin/bash //进入容器
rabbitmq-plugins enable rabbitmq_management//安装插件 ctrl + p+q退出
http://192.168.18.128:15672/ //地址
失败方式,权限不完善
//账号 密码 映射端口 docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:3.8-management //3.8之后的不允许这样设置权限
7.RabbitMQ的核心组成部分
架构
运行流程
RabbitMQ支持消息的模式
1、简单模式 Simple
- 参考第12章节
2、工作模式 Work
- web操作查看视频
- 类型:无
- 特点:分发机制
3、发布订阅模式
- web操作查看视频
- 类型:fanout
- 特点:Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。
4、路由模式
- web操作查看视频
- 类型:direct
- 特点:有routing-key的匹配模式
5、主题Topic模式
- web操作查看视频
- 类型:topic
- 特点:模糊的routing-key的匹配模式
- 通配符 (# 代表0或多个,*至少一个)
6、参数模式
- web操作查看视频
- 类型:headers
- 特点:参数匹配模式
8.RabbitMQ使用场景
消息队列-RabbitMQ篇章- 专栏 -KuangStudy
9.RabbitMQ整合SpringBoot
producer 生产者
配置class
@Configuration
public class RabbitMqConfiguration {
//声明fanout模式交换机 换成不同类型进行不同转发
@Bean
public FanoutExchange fanoutOrderExchange(){
return new FanoutExchange("fanout_order_exhange",true,false);
}
//创建队列,name
@Bean
public Queue smsQueue(){
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue duanxinQueue(){
return new Queue("duanxin.fanout.queue", true);
}
@Bean
public Queue emailQueue(){
return new Queue("email.fanout.queue", true);
}
@Bean
public Binding binding1(){
/*
fanout没有路由匹配,所以没有with方法
direct:绑定确定key
topic:模糊的key
*/
return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()); //.with(key)
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(duanxinQueue()).to(fanoutOrderExchange());
}
@Bean
public Binding binding3(){
return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange());
}
}
发送Services
@Service
public class OrderServices {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 模拟用户下单
* @param userId
* @param productId
* @param num
*/
public void makeOrder(String userId, String productId, int num){
String orderId = UUID.randomUUID().toString();
System.out.println("订单派发成功 " + orderId);
String exchangeName = "fanout_order_exhange"; //交换机名字
String routingKey = ""; //带路由.topic模式需要编写表达式来模糊匹配
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
//rabbitTemplate.convertAndSend(exchangeName, routingKey2, orderId);//多发
}
}
测试
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Autowired
private OrderServices orderServices;
@Test
void contextLoads() {
orderServices.makeOrder("1", "1", 12);
}
}
consumber 消费者
package com.rain.services.fanout;
@RabbitListener(queues = "duanxin.fanout.queue") //队列名字
@Component
public class FanoutDuanxing {
@RabbitHandler
public void revicesMessage(String mesage) {
System.out.println("duan xin ----接受的消息是:->" + mesage);
}
}
注解形式
package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutconsumer.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "email.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "fanout_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.FANOUT)
))
@Component
public class EmailService {
// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值
@RabbitHandler
public void messagerevice(String message){
// 此处省略发邮件的逻辑
System.out.println("email-------------->" + message);
}
}
10.过期时间
队列过期时间
@Bean public Queue TTLQueue(){ Map<String,Object> args = new HashMap<>(); args.put("x-message-ttl",5000); //设置过期时间5s return new Queue("TTL.queue", true, false, false, args); } //其他不变
指定消息过期时间
public void makeOrderttlOne(String userId, String productId, int num){ String orderId = UUID.randomUUID().toString(); System.out.println("订单派发成功 " + orderId); String exchangeName = "TTL_exhange"; String routingKey = "ttlmag"; //带路由 //给消息设置过期时间 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); message.getMessageProperties().setContentEncoding("utf-8"); return message; } }; rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId, messagePostProcessor); //rabbitTemplate.convertAndSend(exchangeName, routingKey2, orderId);//多发 }
11.死信队列
死信交换机
package com.rain.config;
@Configuration
public class DeadRabbitMqConfiguration {
//声明死信交换机
@Bean
public DirectExchange DeadOrderExchange(){
return new DirectExchange("dead_direct_exhange",true,false);
}
@Bean
public Queue deadqueue(){
return new Queue("dead.direct.queue", true);
}
@Bean
public Binding binding3(){
return BindingBuilder.bind(deadqueue()).to(DeadOrderExchange()).with("dead");
}
绑定死信队列
package com.rain.config;
@Configuration
public class TTLRabbitMqConfiguration {
@Bean
public Queue TTLQueue(){
Map<String,Object> args = new HashMap<>();
args.put("x-message-ttl",5000); //设置过期时间5s
args.put("x-dead-letter-exchange", "dead_direct_exhange"); //死信交换机名字
args.put("x-dead-letter-routing-key", "dead"); //fanout不需要指定key
return new Queue("TTL.queue", true, false, false, args);
}