首页 > 技术文章 > .NetCore中简单使用EasyNetQ

johnyong 2020-10-29 00:15 原文

前言

我们在.Net中使用RabbitMQ,最原始的就是基于RabbitMQ.Client进行编码,在这个过程中我们需要通过代码约定和维护队列,Exchange等。如果是自行编码封装通用型的RabbitMQ组件还是比较麻烦的,那么是否已经有比较优秀的RabbitMQ封装组件呢----EasyNetQ就是一个封装比较好并且非常方便使用的组件。

EasyNetQ是什么?

EasyNetQ是在RabbitMQ.Client库之上提供服务的组件集合(基于RabbitMQ.Client的进一步封装)。这些封装包括序列化,错误处理,线程编组,连接管理等。它们由mini-IoC容器组成。我们可以轻松地用自己的实现替换任何组件。EasyNetQ官方网站http://easynetq.com/

以下就是EasyNetQ官网的api设计结构图和部分描述信息

 

.NetCore中简单使用EasyNetQ

注:安装并配置好RabbiMQ(略)

一、注入EasyNetQ

1) 配置RabbitMQ连接信息

"RabbitMqConnetion": "host=localhost;virtualHost=test_vh;username=qingy;password=r3295"

 

 

 

2)在startup.cs中注入EasyNetQ

            //注入easynetQ
            string mqConnetion = Configuration["RabbitMqConnetion"];
            services.AddSingleton(RabbitHutch.CreateBus(mqConnetion));

 

 

 

 二、定义消息实体,并实现发送接收消息

1)、定义消息实体和对应业务处理类

   [Queue("qingy.SimpleMessaggeCommand", ExchangeName = "qingy.SimpleMessaggeCommand")]
    public class SimpleMessaggeCommand
    {
        /// <summary>
        /// Id
        /// </summary>
        public Guid Id { get; set; }
        /// <summary>
        /// Message
        /// </summary>
        public string Message { get; set; }
        /// <summary>
        /// 消息时间
        /// </summary>
        public DateTime MessageTime { get; set; }
    }

其中[Queue("qingy.SimpleMessaggeCommand", ExchangeName = "qingy.SimpleMessaggeCommand")],可以不指定。easynetq会自动生成相应的queue和exchange,但是为了便于阅读还是指定名称比较好。

 

订阅消息业务处理

 1     public class MyService : IConsumeAsync<SimpleMessaggeCommand>
 2     {
 3         /// <summary>
 4         /// 处理SimpleMessaggeCommand
 5         /// </summary>
 6         /// <param name="message"></param>
 7         /// <returns></returns>
 8         [AutoSubscriberConsumer(SubscriptionId = "Simple.SimpleMessagge")]
 9         [SubscriptionConfiguration(PrefetchCount = 20)]
10         public Task ConsumeAsync(SimpleMessaggeCommand message)
11         {
12             //TODO logic here
13             Console.WriteLine($"{message.Id.ToString()}---{message.Message}--{message.MessageTime.ToString("yyyy-MM-dd HH:mm:ss")}");
14             return Task.CompletedTask;
15         }
16     }
 [AutoSubscriberConsumer(SubscriptionId = "Simple.SimpleMessagge")]  声明subscriptionid,如果确定只有一个订阅者,可以不声明。
 [SubscriptionConfiguration(PrefetchCount = 20)]       声明并发处理的数量,一般调试时可以设置为1,进行单条调试测试。

2)注入消息订阅

 

 1         /// <summary>
 2         /// 注入消息订阅
 3         /// </summary>
 4         /// <param name="app"></param>
 5         /// <param name="subscriptionPrefix"></param>
 6         /// <param name="assembly"></param>
 7         /// <returns></returns>
 8         public static IApplicationBuilder UseMqSubscribe(this IApplicationBuilder app, string subscriptionPrefix, Assembly assembly)
 9         {
10             var services = app.ApplicationServices;
11             var lifeTime = services.GetService<IHostApplicationLifetime>();
12             var bus = services.GetService<IBus>();
13             lifeTime.ApplicationStarted.Register(() =>
14             {
15                 var subscriber = new AutoSubscriber(bus, subscriptionPrefix) {
16                     
17                 };
18                 subscriber.Subscribe(assembly);
19                 subscriber.SubscribeAsync(assembly);
20             });
21             lifeTime.ApplicationStopped.Register(()=> {
22                 bus.Dispose();
23             });
24             return app;
25         }

 

在startup.cs的Configure方法中添加

            var ass = Assembly.LoadFrom(Path.Combine(AppContext.BaseDirectory, "Qingy.HubServie.dll")); 
            app.UseMqSubscribe("Simple", ass);

 

三、运行测试

 

 

 

 

 

推荐阅读