首页 > 解决方案 > MassTransit 中间件:不带 Automatonymous 的 Saga 的 OnMissingInstance 等效项

问题描述

我试图通过遵循与在 NServiceBus 中实现 Sagas 的方式类似的模式,在不使用自动状态机(我开始使用但发现很难正确进行单元测试)的情况下手动启动 Saga。

但是,我遇到了一个问题,即在初始消息创建实例之前,消息正在命中 saga,这会导致 MassTransit 默默地吞下消息,而不会引发异常或将消息移动到错误队列中。

在试图找出解决这个问题的过程中,很多人建议使用OnMissingInstance错误消息,然后依靠重试框架来有效地延迟它,直到 saga 被正确初始化。 见这里

我想知道是否有一种方法可以在不使用 Automatonymous 框架的情况下做到这一点,很可能是通过利用一些中间件来抢先检查以确保在消息尝试处理之前存在 Saga 消息(抛出异常并稍后重试)它?如果不是,这听起来像是有用/可能的吗?

更多信息:

- 使用 Azure 服务总线
-MongoDB 实现 Saga 持久性。

一些代码简化了我想要的代码片段:

ec.Saga(new MongoDbSagaRepository<CodeProviderSaga>(sagaDatabase, new MongoDbSagaConsumeContextFactory(), nameof(CodeProviderSaga)), config =>
{
    config.UseApplicationInsights(telemetryClient);
    config.UseRetry(retryConfig => retryConfig.Exponential(10, TimeSpan.FromMilliseconds(500), TimeSpan.FromMinutes(5), TimeSpan.FromSeconds(1)));
});
public class CodeProviderSaga :
    InitiatedBy<FirstEvent>,
    Orchestrates<SecondEvent>,
    IVersionedSaga
{
    [BsonId]
    public Guid CorrelationId { get; set; }
    public int Version { get; set; }
    public bool SecondEventRecieved { get; set; }
    public DateTimeOffset? LastUpdated { get; set; }

    public Task Consume(ConsumeContext<FirstEvent> context)
    {
        this.LastUpdated = DateTimeOffset.UtcNow;
        return Task.Completed;
    }

    // An exception should be thrown if this is received first so it can be retried but it is silently dropped atm
    public Task Consume(ConsumeContext<SecondEvent> context)
    {
        this.SecondEventRecieved = true;
        this.LastUpdated = DateTimeOffset.UtcNow;

        return Task.Completed;
    }
}

标签: middlewaremasstransitsagaautomatonymous

解决方案


Orchestrates<T>为了解决这个问题,刚刚向开发分支添加了一个提交,该提交对于没有在接口中指定的匹配 saga 的消息引发异常。


推荐阅读