首页 > 解决方案 > Rebus Saga 的并发异常

问题描述

在处理 Rebus 文档中的 Crm 系统 Saga 示例时,我在处理事件 LegalInfoAcquiredInFirstSystem 和 LegalInfoAcquiredInSecondSystem 时收到带有以下详细信息的 ConcurrencyException。

错误详情如下

message_id: 150d3d50-1de4-4a2f-bd60-660fc441412e delivery_mode: 2 headers:
rbs2-content-type: application/json;charset=utf-8 rbs2-corr-id: 0848197f-3ebb-442a-a80b-49c4c30dc0ca rbs2-corr- seq:2 rbs2-error-details:System.AggregateException:1 未处理的异常(ID 为 95395c60-cf2e-48da-89ff-fdf192ce53b9 的 saga 更新没有成功,因为其他人击败了我们)---> Rebus.Exceptions。 ConcurrencyException:使用 ID 95395c60-cf2e-48da-89ff-fdf192ce53b9 更新 saga 没有成功,因为其他人在 Rebus.SqlServer.Sagas.SqlServerSagaStorage.Update(ISagaData sagaData, IEnumerable 1 correlationProperties) at Rebus.Sagas.LoadSagaDataStep.SaveSagaData(RelevantSagaInfo sagaDataToUpdate, Boolean insert) at Rebus.Sagas.LoadSagaDataStep.SaveSagaData(RelevantSagaInfo sagaDataToUpdate, Boolean insert) at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.Pipeline.Receive击败了我们.ActivateHandlersStep.Process(IncomingStepContext 上下文,Func1 next) at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func 1 next) at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func1 next) at Rebus.Retry.FailFast.FailFastStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.Retry.Simple.SimpleRetryStrategyStep.DispatchWithTrackerIdentifier(Func`1 next, String identifierToTrackMessageBy, ITransactionContext transactionContext, String messageId, String secondLevelMessageId) --- 内部异常堆栈跟踪结束 --- rbs2-intent: pub rbs2- msg-id:150d3d50-1de4-4a2f-bd60-660fc441412e rbs2-msg-type:Crm.Messages.Events.LegalInfoAcquiredInFirstSystem,Crm.Messages.Events rbs2-return-address:RebusQueue rbs2-sender-address:RebusQueue rbs2-senttime: 2021-08-07T23:27:49.8601606+03:00 rbs2-source-queue: RebusQueue content_encoding: utf-8 content_type: application/json Payload 100 bytes Encoding: string {"$type":"Crm.Messages.Events.LegalInfoAcquiredInFirstSystem , Crm.Messages.Events","CorrId":"70001"}

我的 Rebus 配置如下:

``

     public void ConfigureServices(IServiceCollection services)
     {

        AppSettings settings = new AppSettings();
        Configuration.Bind(settings);
        services.AutoRegisterHandlersFromAssemblyOf<AcquireLegalInformationFromFirstSystemHandler>();
        services.AddControllers();
        services.AddLogging(logging => logging.AddConsole());
        services.AddRebus((configure, serviceProvider) =>  configure
                .Transport(x =>
                {
                    x.UseRabbitMq($"amqp://{settings.Settings.UserName}:{settings.Settings.Password}@{settings.Settings.HostName}", settings.Settings.EndpointQueueName);

                })
                .Options(o => o.SetBusName("RebusSaga"))
                .Options(o => o.SimpleRetryStrategy(errorQueueAddress: settings.Settings.ErrorQueueName, maxDeliveryAttempts: 1, secondLevelRetriesEnabled: false))
                .Sagas(s => 
                {
                    s.StoreInSqlServer(settings.ConnectionStrings.RebusContext, "Sagas", "SagaIndex", true);
                    

                })
                .Timeouts(s => s.StoreInSqlServer(settings.ConnectionStrings.RebusContext, "Timeouts",true))
                
                .Routing(r => r.TypeBased()
                .MapAssemblyOf<Crm.Messages.Events.CustomerCreated>(settings.Settings.EndpointQueueName)
                .MapFallback("RebusErrors"))

                );

       

    }

``

错误发生在下面的 Saga 处理程序中

``

    public async Task Handle(LegalInfoAcquiredInFirstSystem first)
    {
        Data.GotLegalInfoFromFirstSystem = true;

        await PossiblyPerformCompleteAction();
    }

    public async Task Handle(LegalInfoAcquiredInSecondSystem first)
    {
        Data.GotLegalInfoFromSecondSystem = true;

        await PossiblyPerformCompleteAction();
    }

    async Task PossiblyPerformCompleteAction()
    {
        if (Data.GotLegalInfoFromFirstSystem && Data.GotLegalInfoFromSecondSystem)
        {
            await bus.Publish(new CustomerIsLegallyOk { CrmCustomerId = Data.CrmCustomerId });

            MarkAsComplete();
        }
    }

``

这个错误的可能来源是什么。谢谢

标签: sagarebus

解决方案


我终于通过添加 s.EnforceExclusiveAccess(); 解决了这个问题;到 Saga 选项。


推荐阅读