首页 > 解决方案 > Azure Webjob 中的“SqlConnection 不支持并行事务”

问题描述

我没有在我的 C# .NET Core v3.1 中明确使用带有 EFCore v3 代码的事务,并且一切正常。除了我的 Azure Webjob。它听一个队列。当队列中有多条消息并因此函数被并行调用多次时,我会遇到事务错误。

我的网络作业从存储中读取文件并将内容保存到数据库表中。我也使用了 Sharding 机制:每个客户端都有自己的数据库。

我尝试使用TransactionScope,但后来出现其他错误。我发现的示例使用TransactionScope并打开连接并以一种方法进行保存。我将这些部分分成几种方法,使我不清楚如何使用TransactionScope.

这是一些代码:

ImportDataService.cs:
  //using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
  await using var tenantContext = await _tenantFactory.GetContextAsync(clientId, true);
  await tenantContext.Foo.AddRangeAsync(dboList, cancellationToken);
  await tenantContext.SaveChangesAsync(cancellationToken);
  //scope.Complete();

TenantFactory.cs:
public async Task<TenantContext> GetContextAsync(int tenantId, bool lazyLoading = false)
{
    _tenantConnection = await _sharding.GetTenantConnectionAsync(tenantId);
    var optionsBuilder = new DbContextOptionsBuilder<TenantContext>();
    optionsBuilder.UseLoggerFactory(_loggerFactory);
    if (lazyLoading) optionsBuilder.UseLazyLoadingProxies();
    optionsBuilder.UseSqlServer(_tenantConnection,
        options => options.MinBatchSize(5).CommandTimeout(60 * 60));
    return new TenantContext(optionsBuilder.Options);
}

此代码导致SqlConnection does not support parallel transactions.
启用时TransactionScope出现此错误:This platform does not support distributed transactions.

在我的ConfigureServices我有

services.AddSingleton<IImportDataService, ImportDataService>();
services.AddTransient <ITenantFactory, TenantFactory>();
services.AddTransient <IShardingService, ShardingService>();

我也试过AddScoped但没有改变。

编辑:附加代码

ShardingService.cs
public async Task<SqlConnection> GetTenantConnectionAsync(int tenantId)
{
    SqlConnection tenantConnection;
    try
    {
        tenantConnection = await _clientShardMap.OpenConnectionForKeyAsync(tenantId, _tenantConnectionString, ConnectionOptions.Validate);
    }
    catch (Exception e)
    {
        _logger.LogDebug($"Error getting tenant connection for key {tenantId}. Error: " + e.Message);
        throw;
    }

    if (tenantConnection == null) throw new ApplicationException($"Cannot get tenant connection for key {tenantId}");
    return tenantConnection;
}

当 WebJob 被触发时,它会从表中读取一条记录。记录的 ID 在队列消息中。在处理数据之前,它首先将状态更改为正在处理,并且在处理数据时将状态更改为已处理或错误:

var fileImport = await _masterContext.FileImports.FindAsync(fileId);
fileImport.Status = Status.Processing;
await _masterContext.SaveChangesAsync();

if (await _fileImportService.ProcessImportFile(fileImport))
                    fileImport.Status = Status.Processed;

await _masterContext.SaveChangesAsync();

标签: c#azure-webjobstransactionscope.net-core-3.1ef-core-3.1

解决方案


推荐阅读