c# - Azure Webjob 中的“SqlConnection 不支持并行事务”
问题描述
我没有在我的 C# .NET Core v3.1 中明确使用带有 EFCore v3 代码的事务,并且一切正常。除了我的 Azure Webjob。它听一个队列。当队列中有多条消息并因此函数被并行调用多次时,我会遇到事务错误。
我的网络作业从存储中读取文件并将内容保存到数据库表中。我也使用了 Sharding 机制:每个客户端都有自己的数据库。
我尝试使用TransactionScope
,但后来出现其他错误。我发现的示例使用TransactionScope
并打开连接并以一种方法进行保存。我将这些部分分成几种方法,使我不清楚如何使用TransactionScope
.
这是一些代码:
ImportDataService.cs:
//using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
await using var tenantContext = await _tenantFactory.GetContextAsync(clientId, true);
await tenantContext.Foo.AddRangeAsync(dboList, cancellationToken);
await tenantContext.SaveChangesAsync(cancellationToken);
//scope.Complete();
TenantFactory.cs:
public async Task<TenantContext> GetContextAsync(int tenantId, bool lazyLoading = false)
{
_tenantConnection = await _sharding.GetTenantConnectionAsync(tenantId);
var optionsBuilder = new DbContextOptionsBuilder<TenantContext>();
optionsBuilder.UseLoggerFactory(_loggerFactory);
if (lazyLoading) optionsBuilder.UseLazyLoadingProxies();
optionsBuilder.UseSqlServer(_tenantConnection,
options => options.MinBatchSize(5).CommandTimeout(60 * 60));
return new TenantContext(optionsBuilder.Options);
}
此代码导致SqlConnection does not support parallel transactions
.
启用时TransactionScope
出现此错误:This platform does not support distributed transactions.
在我的ConfigureServices
我有
services.AddSingleton<IImportDataService, ImportDataService>();
services.AddTransient <ITenantFactory, TenantFactory>();
services.AddTransient <IShardingService, ShardingService>();
我也试过AddScoped
但没有改变。
编辑:附加代码
ShardingService.cs
public async Task<SqlConnection> GetTenantConnectionAsync(int tenantId)
{
SqlConnection tenantConnection;
try
{
tenantConnection = await _clientShardMap.OpenConnectionForKeyAsync(tenantId, _tenantConnectionString, ConnectionOptions.Validate);
}
catch (Exception e)
{
_logger.LogDebug($"Error getting tenant connection for key {tenantId}. Error: " + e.Message);
throw;
}
if (tenantConnection == null) throw new ApplicationException($"Cannot get tenant connection for key {tenantId}");
return tenantConnection;
}
当 WebJob 被触发时,它会从表中读取一条记录。记录的 ID 在队列消息中。在处理数据之前,它首先将状态更改为正在处理,并且在处理数据时将状态更改为已处理或错误:
var fileImport = await _masterContext.FileImports.FindAsync(fileId);
fileImport.Status = Status.Processing;
await _masterContext.SaveChangesAsync();
if (await _fileImportService.ProcessImportFile(fileImport))
fileImport.Status = Status.Processed;
await _masterContext.SaveChangesAsync();
解决方案
推荐阅读
- amazon-web-services - 如何为 Elastic Beanstalk 网址启用 HTTPS
- json - .net core api 究竟返回浏览器的是什么?
- ruby - Gem 自动版本规范,让 gem 名称后的参数为空
- postgresql - 使用从其他表插入返回的 id 更新表
- javascript - TS - 在分配之前使用变量“x”。TS(2454)
- google-sheets - 引用 Connected Google Sheet 的函数可以自动更新吗?
- ios - 在通知服务类中将大 NSString 转换为 NSData
- azure - 如何将作为对 Azure 函数应用中的 powershell Web 请求的响应而收到的文件保存到 Azure 存储容器?
- c# - 为什么视频不能在 mvc 中开始下载?
- sql - 如何在 PostgreSQL 中使用子查询进行更新