首页 > 技术文章 > Abp vNext 事件总线(Event Bus)

easy5weikai 2022-05-20 12:29 原文

Abp vNext 事件总线(Event Bus)

本地事件总线

文档:https://docs.abp.io/zh-Hans/abp/latest/Local-Event-Bus

分布式事件总线

文档:https://docs.abp.io/zh-Hans/abp/latest/Distributed-Event-Bus

看下官方文档的说明:

使用本地事件总线作为默认具有一些重要的优点. 最重要的是:它允许你编写与分布式体系结构兼容的代码. 您现在可以编写一个整体应用程序,以后可以拆分成微服务. 最好通过分布式事件而不是本地事件在边界上下文之间(或在应用程序模块之间)进行通信.

解读:

当一个类中使用了分布式EventBus,如下代码所示:

public class MqttConnectionService : IMqttConnectionService, ISingletonDependency
{
    private readonly IDistributedEventBus _distributedEventBus;
    
    public MqttConnectionService(IDistributedEventBus distributedEventBus)
    {
        _distributedEventBus = distributedEventBus;
    }
    // ...
    public async Task HandleClientConnectedAsync(MqttServerClientConnectedEventArgs eventArgs)
    {
        await _distributedEventBus.PublishAsync(
           new ClientConnectedEto
           {
               ClientId = eventArgs.ClientId
           }
        );
    }
    
   // ...
}

如果没有集成分布式EventBus 的组件,比如集成 RabbitMqDistributedEventBus 。

那么默认使用本地EventBus,

千言万语,就一句话:

使用分布式事件总线:IDistributedEventBus _distributedEventBus ,代码能同时兼容两种事件总线

(1)没提供分布式事件总线,就使用本地事件总线

(2)提供分布式事件总线,就使用分布式事件总线。

实操

实操的分布式事件总线示意图:

创建微服务

创建微服务 IotHub

abp new Artisan.IotHub -u none -d ef -dbms SqlServer --separate-identity-server --version 5.2.1

创建微服务 IotEdge

abp new Artisan.IotEdge -u none -d ef -dbms SqlServer --separate-identity-server --version 5.2.1

添加事件对象

在项目【Artisan.IotHub.Application.Contracts】中添加 Eto,

代码清单:Artisan.IotHub.Application.Contracts/Etos/ClientConnectedEto.cs

    /// <summary>
    /// EventName 属性是可选的,但是建议使用。
    /// 如果没有为事件类型(ETO 类)声明它,
    /// 则事件名称将是事件类的全名,即: Artisan.IotHub.Etos.ClientConnectedEto
    /// 参见:Abp:Distributed Event Bus: https://docs.abp.io/en/abp/latest/Distributed-Event-Bus
    /// 
    /// 客户端上线
    /// 
    /// </summary>
    [EventName(MqttServerEvents.ClientConnected)]
    public class ClientConnectedEto
    {
        public string ClientId { get; set; }
    }

代码清单:Artisan.IotHub.Application.Contracts/Events/MqttServerEvents.cs

    public static class MqttServerEvents
    {
        private const string Prefix = "Artisan.IotHub.MqttServer";
        public const string ClientConnected = $"{Prefix}.ClientConnected";
     }

添加事件处理器

代码清单:Artisan.IotHub.Application/Services/ClientConnectedEventHandler.cs

using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;

namespace Artisan.IotHub.Services
{
    public class ClientConnectedEventHandler : 
        IDistributedEventHandler<ClientConnectedEto>, 
        ITransientDependency
    {
        private readonly ILogger<ClientConnectedEventHandler> _logger;

        public ClientConnectedEventHandler(ILogger<ClientConnectedEventHandler> logger)
        {
            _logger = logger;
        }

        public Task HandleEventAsync(ClientConnectedEto eventData)
        {
            var clientId = eventData.ClientId;
            _logger.LogInformation($"Client:{clientId} has connected the broker");

            //TODO:其它业务逻辑
            return Task.CompletedTask;
        }
    }
}

发布事件

代码清单:Artisan.IotHub.Application/Mqtts/MqttConnectionService.cs

public class MqttConnectionService : IMqttConnectionService, ISingletonDependency
{
    private readonly IDistributedEventBus _distributedEventBus;
    
    public MqttConnectionService(IDistributedEventBus distributedEventBus)
    {
        _distributedEventBus = distributedEventBus;
    }
    // ...
    public async Task HandleClientConnectedAsync(MqttServerClientConnectedEventArgs eventArgs)
    {
        await _distributedEventBus.PublishAsync(
           new ClientConnectedEto
           {
               ClientId = eventArgs.ClientId
           }
        );
    }
    
   // ...
}

当发布事件后,微服务Iothub中的事件处理器ClientConnectedEventHandler中的HandleEventAsync()方法将会被执行。

集成 RabbitMQ 分布式事件总线

文档:https://docs.abp.io/zh-Hans/abp/latest/Distributed-Event-Bus-RabbitMQ-Integration

到目前为止,使用的还是默认的本地事件总线!即使代码中使用的是分布式事件总线:

        await _distributedEventBus.PublishAsync(
           new ClientConnectedEto
           {
               ClientId = eventArgs.ClientId
           }
        );

Abp 支持的分布式事件总线种类见:https://docs.abp.io/zh-Hans/abp/latest/Distributed-Event-Bus

这里我们使用RabbitMQ 分布式事件总线 ,具体步骤如下:

安装 RabbitMQ

如何安装 RabbitMQ ,参见:https://www.cnblogs.com/easy5weikai/p/16217858.html

集成

添加包

在项目【Artisan.IotHub.HttpApi.Host】中引用如下包:

   <PackageReference Include="Volo.Abp.EventBus.RabbitMQ" Version="5.2.1" />
添加模块依赖

然后在 IotHubHttpApiHostModule中添加模块依赖:

代码清单:Artisan.IotHub.HttpApi.Host/IotHubHttpApiHostModule.cs

[DependsOn(
    // ...
    typeof(AbpEventBusRabbitMqModule)
)]
public class IotHubHttpApiHostModule : AbpModule
{
    // ...
}
添加配置

在项目【Artisan.IotHub.HttpApi.Host】的配置文件:appsettings.json 中添加如下配置:

  "RabbitMQ": {
    "Connections": {
      "Default": {
        "HostName": "localhost"
      }
    },
    "EventBus": {
      "ClientName": "Artisan_Iot_Hub",
      "ExchangeName": "Artisan_Iot"
    }
  }

发布事件

再次调用:

        await _distributedEventBus.PublishAsync(
           new ClientConnectedEto
           {
               ClientId = eventArgs.ClientId
           }
        );

微服务IotHub的事件处理器ClientConnectedEventHandler中的HandleEventAsync()方法将会被执行。

登录 RabbitMQ 的管理后台:http://localhost:15672

在 【Exchanges】选项卡中能看到在项目中配置名为:Artisan_Iot 的 Exchange(交换机),如下图所示:

点进去能查看到它名下的路由消息,如下图所示:

有一个名为: Artisan.IotHub.MqttServer.ClientConnected 的Routing key,这就是定义事件对象时使用的的事件名称:

    [EventName(MqttServerEvents.ClientConnected)]
    public class ClientConnectedEto
    {
        public string ClientId { get; set; }
    }

MqttServerEvents.ClientConnected的值为:Artisan.IotHub.MqttServer.ClientConnected

跨服务使用 :IotEdge服务

到目前为止,事件还是在同一个项目进程中使用,那如何在跨服务进程中使用分布式事件总线呢?

添加包

在项目【Artisan.IotEdge.HttpApi.Host】中引用如下包:

   <PackageReference Include="Volo.Abp.EventBus.RabbitMQ" Version="5.2.1" />

添加模块依赖

然后在 IotEdgeHttpApiHostModule中添加模块依赖:

代码清单:Artisan.IotEdge.HttpApi.Host/IotEdgeHttpApiHostModule.cs

[DependsOn(
    // ...
    typeof(AbpEventBusRabbitMqModule)
)]
public class IotEdgeHttpApiHostModule : AbpModule
{
    // ...
}

添加配置

在项目【Artisan.IotEdge.HttpApi.Host】的配置文件:appsettings.json 中添加如下配置:

  "RabbitMQ": {
    "Connections": {
      "Default": {
        "HostName": "localhost"
      }
    },
    "EventBus": {
      "ClientName": "Artisan_Iot_Edge",
      "ExchangeName": "Artisan_Iot"
    }
  }

注意:

这的 ClientName是:Artisan_Iot_Edge

ExchangeName与微服务 IotHub 的配置保持一致,即:Artisan_Iot

添加事件对象

在项目【Artisan.IotEdge.Application.Contracts】中添加 Eto,

或者建立一个共享(Shared)的类库项目,让两个服务IotHub 和 IotEdge 共享这些类对象。

代码清单:Artisan.IotEdge.Application.Contracts/Etos/ClientConnectedEto.cs

    /// <summary>
    /// EventName 属性是可选的,但是建议使用。
    /// 如果没有为事件类型(ETO 类)声明它,
    /// 则事件名称将是事件类的全名,即: Artisan.IotEdge.Etos.ClientConnectedEto
    /// 参见:Abp:Distributed Event Bus: https://docs.abp.io/en/abp/latest/Distributed-Event-Bus
    /// 
    /// 客户端上线
    /// 
    /// </summary>
    [EventName(MqttServerEvents.ClientConnected)]
    public class ClientConnectedEto
    {
        public string ClientId { get; set; }
    }

代码清单:Artisan.IotEdge.Application.Contracts/Events/MqttServerEvents.cs

    public static class MqttServerEvents
    {
        private const string Prefix = "Artisan.IotHub.MqttServer";
        public const string ClientConnected = $"{Prefix}.ClientConnected";
     }

添加事件处理器

代码清单:Artisan.IotEdge.Application/Services/ClientConnectedEventHandler.cs

using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;

namespace Artisan.IotEdge.Services
{
    public class ClientConnectedEventHandler : 
        IDistributedEventHandler<ClientConnectedEto>, 
        ITransientDependency
    {
        private readonly ILogger<ClientConnectedEventHandler> _logger;

        public ClientConnectedEventHandler(ILogger<ClientConnectedEventHandler> logger)
        {
            _logger = logger;
        }

        public Task HandleEventAsync(ClientConnectedEto eventData)
        {
            var clientId = eventData.ClientId;
            _logger.LogInformation($"Client:{clientId} has connected the broker");

            //TODO:其它业务逻辑
            return Task.CompletedTask;
        }
    }
}

发布事件

调用微服务【IotHub】的发布事件方法:

        await _distributedEventBus.PublishAsync(
           new ClientConnectedEto
           {
               ClientId = eventArgs.ClientId
           }
        );
  1. 服务【IotHub】的事件处理器ClientConnectedEventHandler中的HandleEventAsync()方法将会被执行。
  2. 服务【IotEdge】的事件处理器ClientConnectedEventHandler中的HandleEventAsync()方法也会被执行。

分布式事件总线示意图

事件是持久化的

分布式事件是被持久化的,即使某个微服务掉线了,重新上线后,它的事件处理器会被调用。

下做个实验来验证。

第一步:启动微服务IotHub,但是不要启动微服务IotEdge;

第二步:微服务IotHub中发布事件:

        await _distributedEventBus.PublishAsync(
           new ClientConnectedEto
           {
               ClientId = eventArgs.ClientId
           }

这时,微服务IotHub的事件处理器ClientConnectedEventHandler中的HandleEventAsync()方法将会被执行。

此时查看 RabbitMQ管理后台,如下图所示:

从上图可以看到:

微服务 IotEdge 由于处于离线状态的,故它订阅的事件没有被处理,RabbitMQ 将它订阅的事件做了持久化,如下图所示:

Message.Ready =1

第三步:我们先在微服务IotEdge的事件处理器ClientConnectedEventHandler中的HandleEventAsync()方法中打个端点,然后再启动微服务IotEdge。

微服务IotEdge启动后,它的处理器ClientConnectedEventHandler中的HandleEventAsync()方法立马被执行,示意图如下所示:

第四步:继续执行后面的代码,

如果,我们打了断点,并停留了很长的时间,即超时了,RabbitMQ 会认为事件处理失败,会再次调用事件处理器。

当处理器ClientConnectedEventHandler中的HandleEventAsync()方法执行完后,RabbitMQ 在等待回复,

此时的 Message.Unacked=1, 如下图所示:

过了一会,RabbitMQ 确认执行完成后,如下图所示:

Message.Unacked=0

推荐阅读