首页 > 技术文章 > 从观察者模式到消息总线

johnyong 2020-06-11 01:13 原文

1、观察者模式

观察者模式是一种发布-订阅模式,发布者发布消息,订阅者就可收到消息(也可以说是被观察者状态发生变化时,会通知观察者)。一个发布者可以被多个订阅者订阅,即它定义的是一种一对多的依赖关系。

实现方式:
第一种方式:直接在发布者发布消息时,直接调用订阅者的方法。这种方式导致了类之间高度依赖,耦合度太高,违反了基本的设计原则。所以一般不会使用这种方式
第二种方式:定义事件,在指定的方法中构造依赖关系(只需将订阅者的方法加入到事件中即可)
以下是简单的代码实现,实现的业务是编辑完产品后通知生成不同平台的Listing。

1)定义委托相关

 /// <summary>
    /// 结束编辑
    /// </summary>
    public class FinishEditProductContract
    {
        /// <summary>
        /// item id
        /// </summary>
        public Guid ItemId { get; set; }
        /// <summary>
        /// item 编码
        /// </summary>
        public string Itemcode { get; set; }
        /// <summary>
        /// item 类型
        /// </summary>
        public int ItemType { get; set; }
        /// <summary>
        /// title
        /// </summary>
        public string Title { get; set; }
        /// <summary>
        /// 中文title
        /// </summary>
        public string TitleCN { get; set; }
        /// <summary>
        /// 价格
        /// </summary>
        public decimal Price { get; set; }
        /// <summary>
        /// 图片
        /// </summary>
        public List<string> ImageUrls { get; set; }
        /// <summary>
        /// 描述
        /// </summary>
        public string Description { get; set; }
        /// <summary>
        /// 颜色
        /// </summary>
        public string Color { get; set; }
        /// <summary>
        /// 尺寸
        /// </summary>
        public string Size { get; set; }
        /// <summary>
        /// 关键字
        /// </summary>
        public List<string> Keywords { get; set; } 
    }

    /// <summary>
    /// 创建listing信息
    /// </summary>
    public class CreateListingContract
    { 
        /// <summary>
        /// item id
        /// </summary>
        public Guid ItemId { get; set; }
        /// <summary>
        /// item 编码
        /// </summary>
        public string Itemcode { get; set; }
        /// <summary>
        /// item 类型
        /// </summary>
        public int ItemType { get; set; }
        /// <summary>
        /// title
        /// </summary>
        public string Title { get; set; }
        /// <summary>
        /// 中文title
        /// </summary>
        public string TitleCN { get; set; }
        /// <summary>
        /// 价格
        /// </summary>
        public decimal Price { get; set; }
        /// <summary>
        /// 主图
        /// </summary>
        public string MainImageUrl { get; set; }
        /// <summary>
        /// 描述
        /// </summary>
        public string Description { get; set; }
       
        //.....
    }
View Code

 

    /// <summary>
    /// 生成listing委托
    /// </summary>
    /// <param name="contract"></param>
    public delegate void CreateListingHandler(CreateListingContract contract);

2)定义发布者

 /// <summary>
    /// 产品管理类
    /// </summary>
    public class ItemManager
    {
        /// <summary>
        /// 创建listing事件
        /// </summary>
        public event CreateListingHandler CreateListingEvent;

        /// <summary>
        /// 完成编辑
        /// </summary>
        public void FinishEditing(FinishEditProductContract finishEditProductContract)
        {
            //TODO 完成编辑

            var contract = new CreateListingContract()
            {
                ItemId = finishEditProductContract.ItemId,

                Description = finishEditProductContract.Description,
                Itemcode = finishEditProductContract.Itemcode,
                ItemType = finishEditProductContract.ItemType,
                MainImageUrl = finishEditProductContract.ImageUrls[0],
                Price = finishEditProductContract.Price,
                Title = finishEditProductContract.Title,
                TitleCN = finishEditProductContract.TitleCN,

            };
            //触发生成listing
            if (CreateListingEvent != null)
            {
                CreateListingEvent(contract);
            }
        }
    }

 

3)定义订阅者

    public class WishListingManager
    {
        public void CreateListing(CreateListingContract contract)
        {
            //TODO创建listing
            Console.WriteLine("生成 wish listing ");
        }
    }
    public class AmzListingManager
    {
        public void CreateListing(CreateListingContract contract)
        {
            //TODO创建listing
            Console.WriteLine("生成 amz listing");
        }
    }
    public class AliListingManager
    {
        public void CreateListing(CreateListingContract contract)
        {
            //TODO创建listing
            Console.WriteLine("生成 ali listing");
        }
    }

 

4、定义发布-订阅"控制器"(绑定事件,触发发布消息)

    public class ProductManagerController
    {
        public void FinishEditing()
        {

            var itemManager = new ItemManager();
            var wishListingManager = new WishListingManager();
            itemManager.CreateListingEvent += wishListingManager.CreateListing;
            //不绑定amz
            //var amaListingManager = new AmzListingManager();
            //itemManager.CreateListingEvent += amaListingManager.CreateListing;
            var aliListingManager = new AliListingManager();
            itemManager.CreateListingEvent += aliListingManager.CreateListing;

            var contract = new FinishEditProductContract()
            {
                ItemId = new Guid(""),
                Color = "RED",
                Description = "this is testing description",
                Itemcode = "FBS000801",
                ItemType = 1,
                ImageUrls = new List<string>() { "http://..." },
                Price = 10,
                Size = "XXL",
                Title = "this is testing",
                TitleCN = "简单测试",

            };
            itemManager.FinishEditing(contract);
        }
    }

 

以上就是简单的实现了观察者模式。

总结:

观察者模式在比较小的业务场景中实现是比较方便的。在winform开发中经常可能用到,特别是自定义用户控件时,当某个事件变化时需要通知到不同的控件进行相应的刷新处理。但是对于一些比较复杂而庞大的代码时,维护发布-订阅模式就变得非常困难了。

观察者模式简单通过事件实现代理

* 这种形式的实现,要求发布方必须明确的知道接收方,并将接收方的对应方法绑定到委托中这样才能在发布方发消息时能通知到接收方

* 这种方式的实现必须将对应类进行关联,代码偶尔度高,并且不通用,不同的消息通知要明确的绑定,也不利于扩展;对于对象的生命周期的维护也比较麻烦。

*对于复杂的系统需要对消息进行统一的分发的,则需引入事件总线

 

事件总线(EventBus)

事件总线是指发布消息的统一实现,订阅消息则根据订阅的消息类型自行处理。即约定好了消息的发布方法和消息消费的实现方式。从而进一步解耦发布者与订阅者,实现业务时不用再去关注事件(委托)的绑定,只需调用指定接口发布消息,按照规定定义订阅接口即可。

下面是事件总线的简单实现

1、定义约定消息体,所有具体的消息体都继承与消息基类

    /// <summary>
    ///  消息体接口
    /// </summary>
    public interface IEvent
    {
        /// <summary>
        /// 消息Id
        /// </summary>
        Guid Id { get; set; }
        /// <summary>
        /// 消息时间
        /// </summary>
        DateTime EventTime { get; set; } 
    }
    /// <summary>
    /// 消息基类
    /// </summary>
    public class BaseEventData : IEvent
    {
        public Guid Id { get ; set ; }
        public DateTime EventTime { get  ; set ; }  
    }

 

2、定义消息消费处理类约定,所有的具体消费消息类必须实现IEventBusHandler

/// <summary>
    /// 消息消费约定接口
    /// </summary>
    public interface IHandler
    {

    }
    /// <summary>
    /// 消息消费接口
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public interface IEventHandler<T> : IHandler where T : BaseEventData
    {
        void Handle(T t);

    }
/// <summary>
    /// 消息消费约定接口
    /// </summary>
    public interface IHandler
    {

    }
    /// <summary>
    /// 消息消费接口
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public interface IEventHandler<T> : IHandler where T : BaseEventData
    {
        void Handle(T t);

    }

 

3、封装bus,将消息与消息处理类进行绑定,并约定好统一的消息发布入口

  1     /// <summary>
  2     /// bus
  3     /// </summary>
  4     public class Bus
  5     {
  6         private static object mLock = new object();
  7         private static Bus _Bus = null;
  8         private ConcurrentDictionary<Type, List<Type>> _DicEventHander;
  9         public static Bus DefaultBus
 10         {
 11             get
 12             {
 13 
 14                 if (_Bus == null)
 15                 {
 16                     lock (mLock)
 17                     {
 18                         if (_Bus == null)
 19                         {
 20                             _Bus = new Bus();
 21                         }
 22                     }
 23 
 24                 }
 25                 return _Bus;
 26             }
 27         }
 28         private Bus()
 29         {
 30             _DicEventHander = new ConcurrentDictionary<Type, List<Type>>();
 31             Init();
 32         }
 33         /// <summary>
 34         /// 初始化
 35         /// </summary>
 36         private void Init()
 37         {
 38             var assembly = Assembly.GetEntryAssembly();
 39             var types = assembly.GetTypes().Where(p => p.IsAbstract == false && p.IsInterface == false).ToList();
 40             foreach (var type in types)
 41             {
 42                 if (typeof(IHandler).IsAssignableFrom(type))
 43                 {
 44                     Type handlerInterface = type.GetInterface("IEventHandler`1");//获取该类实现的泛型接口
 45                     if (handlerInterface != null)
 46                     {
 47                         Type eventDataType = handlerInterface.GetGenericArguments()[0]; // 获取泛型接口指定的参数类型
 48 
 49                         if (_DicEventHander.ContainsKey(eventDataType))
 50                         {
 51                             _DicEventHander[eventDataType].Add(type);
 52                         }
 53                         else
 54                         {
 55                             var handlerTypes = new List<Type> { type };
 56                             _DicEventHander[eventDataType] = handlerTypes;
 57                         }
 58                     }
 59                 }
 60             }
 61 
 62         }
 63         /// <summary>
 64         /// 消息发布入口
 65         /// </summary>
 66         /// <typeparam name="T"></typeparam>
 67         /// <param name="data"></param>
 68         public void SendEvent<T>(T data) where T : BaseEventData
 69         {
 70             if (!_DicEventHander.ContainsKey(typeof(T)))
 71             {
 72                 return;
 73             }
 74             var types = _DicEventHander[typeof(T)];
 75             if (types != null && types.Count > 0)
 76             {
 77                 types.ForEach(p =>
 78                 {
 79                     var method = p.GetMethod("Handle");
 80                     if (method != null)
 81                     {
 82                         var handler = Activator.CreateInstance(p);
 83                         method.Invoke(handler, new object[] { data });
 84                     }
 85                 });
 86             }
 87         }
 88         /// <summary>
 89         /// 注册消费类型
 90         /// </summary>
 91         /// <typeparam name="T"></typeparam>
 92         /// <param name="handlerType"></param>
 93         public void Register<T>(Type handlerType) where T : BaseEventData
 94         {
 95             if (_DicEventHander[typeof(T)] != null)
 96             {
 97                 if (_DicEventHander[typeof(T)].Contains(handlerType))
 98                 {
 99                     return;
100                 }
101                 else
102                 {
103                     _DicEventHander[typeof(T)].Add(handlerType);
104                 }
105             }
106             else
107             {
108                 var handlerTypes = new List<Type> { handlerType };
109                 _DicEventHander[typeof(T)] = handlerTypes;
110             }
111 
112         }
113         /// <summary>
114         /// 移除
115         /// </summary>
116         /// <typeparam name="T"></typeparam>
117         /// <param name="handlerType"></param>
118         public void Remove<T>(Type handlerType)
119         {
120             if (_DicEventHander[typeof(T)] != null)
121             {
122                 _DicEventHander[typeof(T)].Remove(handlerType);
123             }
124         }
125 
126     }

 

4、简单实现示例:

定义具体消息类,

/// <summary>
    /// 库存变更消息
    /// </summary>
    public class ChangeInventoryCommand:BaseEventData
    {
        /// <summary>
        /// SKU
        /// </summary>
        public string SKU { get; set; }
        /// <summary>
        /// 数量
        /// </summary>
        public int Quantity { get; set; }

    }
View Code

定义消息处理类

 /// <summary>
    /// 库存变更处理类
    /// </summary>
    public class ChangeInventoryHandler : IEventHandler<ChangeInventoryCommand>
    {
        public void Handle(ChangeInventoryCommand t)
        {

            //TODO库存变化业务
            Console.WriteLine(t.SKU + "库存变化" + t.Quantity.ToString()) ;
        }
    }
View Code

发布消息测试

  /// <summary>
    /// 简单测试
    /// </summary>
    public class SimpleTester
    {
        /// <summary>
        /// 测试-发送库存变更消息
        /// </summary>
        public void Test()
        { 
            ChangeInventoryCommand changeInventoryEvent = new ChangeInventoryCommand() { 
            SKU="SMB000001",
            Quantity=5,
            };
            Bus.DefaultBus.SendEvent<ChangeInventoryCommand>(changeInventoryEvent);
        }
    }
View Code

测试结果

 

 以上就是简单实现eventbus

总结:

事件总线能够使得发布者与订阅者之间解耦。发布者只需要按照约定发布消息即可,无需关注谁订阅了这个消息;而订阅者者只需要订阅指定的消息,在发布者发布该类型消息时触发订阅的方法。发布者与消费者的关系已经在bus中封装好了的,这样我们就可以只关注业务的实现即可。

 

推荐阅读