首页 > 解决方案 > UnitOfWork 和 DbContext:DI 的线程安全

问题描述

我正在开发一个 .NET Core 2.2 控制台应用程序,该应用程序托管一个IHostedService

public class MqttClientHostedService : IHostedService, IDisposable
{
    [...]
    public MqttClientHostedService(
        ILogger<MqttClientHostedService> logger, 
        IOptions<MqttClientConfiguration> mqttConfiguration, 
        IPositionService positionService)
    {
        this.logger = logger;
        this.config = mqttConfiguration;
        this.positionService = positionService;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        mqttClient = new MqttFactory().CreateMqttClient();
        mqttClient.Connected += async (s, e) => await MqttClient_Connected(s, e);
        mqttClient.ApplicationMessageReceived +=
            async (s, e) => await MqttClient_ApplicationMessageReceived(s, e);

        await mqttClient.ConnectAsync(
            new MqttClientOptionsBuilder()
                .WithTcpServer(config.Value.Host, config.Value.Port).Build());
    }

    private async Task MqttClient_ApplicationMessageReceived(
        object sender, MqttApplicationMessageReceivedEventArgs e)
    {
        string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
        await positionService.HandleMessage(message);
    }

    [...]
}

IPositionService是一个检查消息并检查它是否可以保存在我们的数据库中的管理器:

public class PositionService : IPositionService
{
    [...]

    public PositionService(
        IUnitOfWork unitOfWork, ILogger<PositionService> logger)
    {
        this.unitOfWork = unitOfWork;
        this.logger = logger;
    }

    public async Task HandleMessage(string message)
    {
        Entity entity = await unitOfWork.EntityRepository.GetByMessage(message);

        [...]

        await unitOfWork.EntityRepository.UpdateAsync(entity);
        await unitOfWork.Save();
    }

    [...]
}

IUnitOfWork是 Entity Framework Core 的包装器DbContext(请不要评判我,我有理由这样做):

public class UnitOfWork : IUnitOfWork
{
    [...]

    public UnitOfWork(MyContext myContext)
    {
        this.myContext = myContext;

        EntityRepository = new EFRepository<Entity>(myContext);
    }

    public async Task Save()
    {
        await myContext.SaveChangesAsync();
    }
}

EFRepository<T>,实现IRepository<T>接口,是一个包装器DbSet<T>(同样,请不要评判我)。这里没有相关代码。

控制台应用程序的 Program.cs 配置如下:

[...]
.ConfigureServices((hostContext, services) =>
{
    services.AddDbContext<MyContext>(
        c => c.UseSqlServer("[...]", options => options.UseNetTopologySuite()),
        ServiceLifetime.Transient);

    services.AddTransient<IPositionService, PositionService>();
    services.AddTransient(typeof(IRepository<>), typeof(EFRepository<>));
    services.AddTransient<IUnitOfWork, UnitOfWork>();

    services.AddHostedService<MqttClientHostedService>();

    [...]
});

问题是PositionService.HandleMessage每秒被调用多次,并且DbContext不是线程安全的,我收到以下错误消息:

在前一个操作完成之前,在此上下文上开始了第二个操作。

我通过IUnitOfWorkPositionService的依赖项中删除,而不是注入一个IServiceScopeFactory,并执行以下操作解决了这个问题:

using (IServiceScope serviceScope = serviceScopeFactory.CreateScope())
{
    IUnitOfWork unitOfWork = serviceScope.ServiceProvider.GetService<IUnitOfWork>();

    [...]
}

这种方法有效,但我不喜欢它。这似乎是一个技巧,我不喜欢我PositionService知道Dependency Injection并且必须处理范围的事实。

我的问题是:有没有更好的方法来解决这个问题而不接触我的课程?我应该使整个UnitOfWork线程安全吗?或者也许不使用 DI 手动创建它?

标签: entity-frameworkdependency-injection.net-corethread-safetyunit-of-work

解决方案


问题的根源是MyContext在以下对象图中被俘虏作为俘虏依赖:

MqttClientHostedService
    -> PositionService
        -> UnitOfWork
            -> MyContext 

此图中的所有类型都注册为Transient,但充当托管服务的服务(例如 your MqttClientHostedService)在应用程序期间仅解析一次并无限期地缓存。这有效地使他们成为单身人士。

换句话说,MyContext它意外地被单个消息保持活动状态,MqttClientHostedService并且由于多个消息可以并行出现,您自己就有了竞争条件。

解决方案是让每个ApplicationMessageReceived事件在其自己独特的小气泡(范围)中运行,并IPositionService从该气泡内解决一个新的问题。例如:

public class MqttClientHostedService : IHostedService, IDisposable
{
    [...]
    public MqttClientHostedService(
        ILogger<MqttClientHostedService> logger, 
        IOptions<MqttClientConfiguration> mqttConfiguration, 
        IServiceProvider provider)
    {
        this.logger = logger;
        this.config = mqttConfiguration;
        this.provider = provider;
    }
    [...]
    private async Task MqttClient_ApplicationMessageReceived(
        object sender, MqttApplicationMessageReceivedEventArgs e)
    {
        using (var scope = provider.CreateScope())
        {
            positionService = scope.ServiceProvider
                .GetRequiredService<IPositionService>();
            string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
            await positionService.HandleMessage(message);
        }
    }

    [...]
}

推荐阅读