首页 > 解决方案 > 如何使用 Mediator 配置 MassTransit 以发布消息?

问题描述

我是MassTransitMediator的新手,我有一系列事件要按顺序执行,我在进程中和内存中使用MassTransit,对于我的用例,不需要传输。

我想通过 Mediator 向消费者、sagas、活动发送和发布消息,我有下面的代码,但我想通过在以下位置注册 MassTransit 来改进它startup.cs

//asp net core 3.1 Controller

 [ApiController]
 public class MyController : ControllerBase
 {    
    private readonly IProductService _productService ;
    private readonly IMediator mediator;
    public MyController(IProductService productService)
    {
       _productService = productService;
      var repository = new InMemorySagaRepository<ApiSaga>();
      mediator = Bus.Factory.CreateMediator(cfg =>
      {
          cfg.Saga<ProductSaga>(repository);
      });
    }
   
     [HttpPost]
     public async Task<IActionResult> Post([FromBody] ProductContract productContract)
     {            
         try
         {
             var result = await _productService.DoSomeThingAsync(productContract);
             await mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = Guid.NewGuid(), result.Label });
         return Ok();
         }
         catch (Exception ex)
         {
             return BadRequest(ex.Message);
         }
      }
 }

//My saga
public class ProductSaga :
        ISaga,
        InitiatedBy<ProductSubmittedEvent>
    {
        public Guid CorrelationId { get; set; }
        public string State { get; private set; } = "Not Started";

        public Task Consume(ConsumeContext<ProductSubmittedEvent> context)
        {
            var label= context.Message.Label;
            State = "AwaitingForNextStep";
            //...
           //send next command
        }
    }

像这样它可以工作,但不正确,我想在我的 Mediator 中配置masstransitstartup.cs以获得一个适当的实例,为此我首先删除IMediator,使用 anIPublishEndpoint将消息发布到Saga并配置 my startup.cs,但它不起作用预期的:

//startup.cs

 public void ConfigureServices(IServiceCollection services)
 {
      services.AddMediator(cfg =>
            {
                cfg.AddSaga<ProductSaga>().InMemoryRepository();
            });
 }

//in controller using:
private readonly IPublishEndpoint _publishEndpoint;

//then
await _publishEndpoint.Publish<ProductSubmittedEvent>(
    new { CorrelationId = Guid.NewGuid(), result.Label });

我有一个System.InvalidOperationException

尝试激活“GaaS.API.Controllers.ManageApiController”时无法解析“MassTransit.IPublishEndpoint”类型的服务。

我试图更新我的startup.cs

var repository = new InMemorySagaRepository<ApiSaga>();
            services.AddMassTransit(cfg =>
            {
                cfg.AddBus(provider =>
                {
                    return Bus.Factory.CreateMediator(x =>
                    {
                        x.Saga<ProductSaga>(repository);
                    });
                });
            });

我有:

无法将类型“MassTransit.Mediator.IMediator”隐式转换为“MassTransit.IBusControl”。

如果您有任何推荐想法,谢谢分享和挑战我

标签: c#dependency-injectionmasstransit

解决方案


在您的项目中配置 MassTransit Mediator 的正确方法是通过Startup.cs您似乎已经尝试过的文件。

public void ConfigureServices(IServiceCollection services)
{
    services.AddMediator(cfg =>
    {
        cfg.AddSaga<ProductSaga>().InMemoryRepository();
    });
}

使用调解器,您需要依赖IMediator接口。您不能使用IPublishEndpointor ISendEndpointProvider,因为它们是总线接口。由于您可以在容器中同时拥有调解器和总线实例,因此在从容器中解析服务时会导致混乱。

[ApiController]
public class MyController : ControllerBase
{    
    private readonly IProductService _productService ;
    private readonly IMediator _mediator;

    public MyController(IProductService productService, IMediator mediator)
    {
        _productService = productService;
        _mediator = mediator;
    }

    [HttpPost]
    public async Task<IActionResult> Post([FromBody] ProductContract productContract)
    {            
        try
        {
            var result = await _productService.DoSomeThingAsync(productContract);
            
            await _mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = NewId.NextGuid(), result.Label });

            return Ok();
        }
        catch (Exception ex)
        {
            return BadRequest(ex.Message);
        }
    }
}

如果您只使用调解器,并且想要使用IPublishEndpoint,您可以自己将其添加到容器中并委托它。

services.AddSingleton<IPublishEndpoint>(provider => provider.GetService<IMediator>());

推荐阅读