首页 > 技术文章 > 简单入门Rabbitmq

johnyong 2020-09-23 23:46 原文

什么是RabbitMQ

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。在中小型企业中应用比较多。

RabbitMQ整体架构

1、RabbitMQ的整体架构图

 

 

 

 

 

2、RabbitMQ的整体结构图

 

 

 

 

 

 

3、相关概念

相关概念说明
Broker:标识消息队列服务器实体.相当于安装的RabbitMQ。 Virtual Host:虚拟主机。拥有自己的队列、交换器、绑定和权限机制。相当于在一个broker中实现多租户,一个虚拟主机给一个用户使用。 Queue:消息队列,用来保存消息直到被消费者消费掉(取出消息并回复)。 Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
一般消息提供者不会直接将消息发送到指定的队列中,而是先将消息发送至交换器,交换器再将消息按照交换规则发送到指定队列。 Banding:绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,
这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。 Connection:连接,就是与rabbitmq服务的一个socket连接。 Publisher:消息的生产者,也就是是一个向交换器发布消息的服务或者程序。 Consumer:消息的消费者,表示一个从一个消息队列中取得消息的服务或者程序。

 

基于.NetCore简单实现Rabbitmq

在RabbitMQ中通过Exchange分发消息时,根据类型的不同分发策略有区别。目前共四种类型:direct、fanout、topic、headers(headers匹配AMQP消息的header而不是路由键(Routing-key),此外headers交换器和direct交换器完全一致,但是性能差了很多,目前几乎用不到了)。

1、Direct

Direct是指消息发送的策略是路由键与队列名完全匹配。 它是要求队列绑定的exchange的路由键于发送消息到该exchange上的routekey完全一致,才能将消息发送到改队列中。

 

 

 

简单代码实现消息发送

 1        public void PublishDirectMessage(string queueName, string exchangeName, string routeKey, string message)
 2         {
 3 
 4             var factory = new ConnectionFactory()
 5             {
 6                 HostName = _config.Host,
 7                 VirtualHost = _config.VirtualHost,
 8                 UserName = _config.UserName,
 9                 Password = _config.Password,
10             };
11             using (var connection = factory.CreateConnection())
12             {
13                 using (var channel = connection.CreateModel())
14                 {
15                     //定义交换机
16                     channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
17                     //定义队列
18                     channel.QueueDeclare(queueName, true, false, false, null);
19                     //绑定
20                     channel.QueueBind(queueName, exchangeName, routeKey, null);
21                     byte[] data = Encoding.UTF8.GetBytes(message);
22                     //发送消息
23                     channel.BasicPublish(exchangeName, routeKey, null, data);
24                 }
25             }
26         }

 

2、Topic

Topic是一种模糊匹配路由键的策略,它只模糊识别两个通配符:"#"和"*"。#匹配0个或多个单词,*匹配不多不少一个单词。它要求队列绑定的exchange中绑定的路由键模糊匹配发送消息的routekey,即可将消息发送到该队列中。

 

 

 

简单代码实现消息发送

 1        public void PublishTopicMessage(string queueName, string exchangeName, string routeKey, string message)
 2         {
 3 
 4             var factory = new ConnectionFactory()
 5             {
 6                 HostName = _config.Host,
 7                 VirtualHost = _config.VirtualHost,
 8                 UserName = _config.UserName,
 9                 Password = _config.Password,
10             };
11             using (var connection = factory.CreateConnection())
12             {
13                 using (var channel = connection.CreateModel())
14                 {
15                     channel.QueueDeclare(queueName, true, false, false, null);
16                     channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, true, false, null);
17                     channel.QueueBind(queueName, exchangeName, routeKey, null);
18                     byte[] data = Encoding.UTF8.GetBytes(message);
19                     channel.BasicPublish(exchangeName, routeKey, null, data);
20                 }
21             }
22         }

3、Fanout

Fanout是一种广播模式,它不需要指定路由键。凡是与改exchange绑定的队列都能收到该exchange传入的消息

 

 

 简单代码实现消息发送

 1        public void PublishFanoutMessage(string queueName, string exchangeName, string message)
 2         {
 3 
 4             var factory = new ConnectionFactory()
 5             {
 6                 HostName = _config.Host,
 7                 VirtualHost = _config.VirtualHost,
 8                 UserName = _config.UserName,
 9                 Password = _config.Password,
10             };
11             using (var connection = factory.CreateConnection())
12             {
13                 using (var channel = connection.CreateModel())
14                 {
15                     channel.QueueDeclare(queueName, true, false, false, null);
16                     channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, true, false, null);
17 
18                     byte[] data = Encoding.UTF8.GetBytes(message);
19                     channel.BasicPublish(exchangeName, "", null, data);
20                 }
21             }
22         }

 

 

消息消费者代码实现

 1         public void HandleMessage(string queueName)
 2         {
 3 
 4             //创建工厂--建立连接---创建channel--定义事件消费者并绑定事件--指定消费队列
 5             var factory = new ConnectionFactory()
 6             {
 7                 HostName = _config.Host,
 8                 VirtualHost = _config.VirtualHost,
 9                 UserName = _config.UserName,
10                 Password = _config.Password
11             };
12             var connection = factory.CreateConnection();
13             var channel = connection.CreateModel();
14             channel.QueueDeclare(queueName,true,false,false,null);
15             // channel.BasicQos(1,1,false);
16             ///定义事件消费者
17             EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
18             consumer.Received += Consumer_Received;
19             //消费
20             channel.BasicConsume(queueName, false, consumer);
21             //此处不释放channel,应该在事件处理中完成后释放,不然可能存在事件未处理完,而channel释放,导致消息没能被移除
22             //channel.Dispose();
23             //connection.Close();
24         }
25 
26         private void Consumer_Received(object sender, BasicDeliverEventArgs e)
27         {
28             var message = Encoding.UTF8.GetString(e.Body.ToArray());
29             #region 业务处理
30             Console.WriteLine(message);
31             #endregion
32             EventingBasicConsumer consumer = sender as EventingBasicConsumer;
33             //string exchangeName = e.Exchange;
34             //string routeKey = e.RoutingKey; 
35             //设置已经被消费
36             consumer.Model.BasicAck(e.DeliveryTag, false);
37             //释放Channel
38             consumer.Model.Dispose();
39 
40         }

 

总结

以上简单介绍了rabbitmq的相关概念和整体结构。顺便使用代码实现了rabbitmq中的三种exchange策略(代码仅仅是简单实现,并无多大参考价值!!!)。 

 

推荐阅读