首页 > 解决方案 > 如何使用存储引擎持久化 Saga 实例并避免竞争条件

问题描述

我尝试使用持久化 Saga 实例RedisSagaRepository;我想在负载平衡设置中运行 Saga,所以我不能使用InMemorySagaRepository. 然而,在我切换之后,我注意到消费者发布的一些事件没有被 Saga 处理。我检查了队列,没有看到任何消息。

我注意到,当消费者几乎没有时间处理命令和发布事件时,它可能会发生。InMemorySagaRepository如果我使用或添加将不会出现此Task.Delay()问题Consumer.Consume()

我使用不正确吗?

另外,如果我想在负载平衡设置中运行 Saga,并且如果 Saga 需要使用字典发送多个相同类型的命令来跟踪完整性(类似于处理多个事件的状态转换中的逻辑)。当多个消费者同时发布事件时,如果两个 Sagas 同时处理两个不同的事件,我会出现竞争条件吗?在这种情况下,State 对象中的 Dictionary 是否会正确设置?

代码可在此处获得

SagaService.ConfigureSagaEndPoint()InMemorySagaRepository是我在和之间切换的地方RedisSagaRepository

private void ConfigureSagaEndPoint(IRabbitMqReceiveEndpointConfigurator endpointConfigurator)
{
    var stateMachine = new MySagaStateMachine();

    try

    {
        var redisConnectionString = "192.168.99.100:6379";
        var redis = ConnectionMultiplexer.Connect(redisConnectionString);

        ///If we switch to RedisSagaRepository and Consumer publish its response too quick,
        ///It seems like the consumer published event reached Saga instance before the state is updated
        ///When it happened, Saga will not process the response event because it is not in the "Processing" state
        //var repository = new RedisSagaRepository<SagaState>(() => redis.GetDatabase());
        var repository = new InMemorySagaRepository<SagaState>();

        endpointConfigurator.StateMachineSaga(stateMachine, repository);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
}

LeafConsumer.Consume 是我们添加 Task.Delay() 的地方

public class LeafConsumer : IConsumer<IConsumerRequest>
{
    public async Task Consume(ConsumeContext<IConsumerRequest> context)
    {
        ///If MySaga project is using RedisSagaRepository, uncomment await Task.Delay() below
        ///Otherwise, it seems that the Publish message from Consumer will not be processed
        ///If using InMemorySagaRepository, code will work without needing Task.Delay
        ///Maybe I am doing something wrong here with these projects
        ///Or in real life, we probably have code in Consumer that will take a few milliseconds to complete
        ///However, we cannot predict latency between Saga and Redis
        //await Task.Delay(1000);

        Console.WriteLine($"Consuming CorrelationId = {context.Message.CorrelationId}");
        await context.Publish<IConsumerProcessed>(new
        {
            context.Message.CorrelationId,
        });
    }
}

标签: state-machinemasstransitsagaautomatonymous

解决方案


当您以这种方式发布事件,并使用具有非事务性 saga 存储库(例如 Redis)的多个服务实例时,您需要设计您的 saga,以便 Redis 使用和强制执行唯一标识符。这可以防止创建同一 saga 的多个实例。

您还需要接受超出“预期”状态的事件。例如,在接收到另一个事件之前,期望接收一个开始,这会将 saga 置于处理状态,这很可能会失败。建议允许由任何事件序列启动 saga(最初,在 Automatonymous 中),以避免无序的消息传递问题。只要事件都将表盘从左向右移动,就会达到最终状态。如果在较晚的事件之后接收到较早的事件,则不应将状态向后移动(或在此示例中向左移动),而仅将信息添加到 saga 实例并将其保留在较晚的状态。

如果在不同的服务实例上处理两个事件,它们都会尝试将 saga 实例插入 Redis,这将作为重复实例失败。然后该消息应该重试(将 UseMessageRetry() 添加到您的接收端点),然后它将拾取现在存在的 saga 实例并应用该事件。


推荐阅读