c# - 在使用 Azure 服务总线消息后尝试连接到我的数据库时处理上下文实例错误
问题描述
我正在侦听传入的 Azure 服务总线消息。按照文档并接收消息,我解析消息正文,然后我想连接到我的数据库以编辑条目然后保存。但是我在尝试拨打电话时在下面收到此错误
var ybEvent = await _unitOfWork.Repository<YogabandEvent>().GetEntityWithSpec(spec);
错误
无法访问已释放的上下文实例。此错误的一个常见原因是释放从依赖注入中解析的上下文实例,然后尝试在应用程序的其他地方使用相同的上下文实例。如果您在上下文实例上调用“Dispose”或将其包装在 using 语句中,则可能会发生这种情况。如果你使用依赖注入,你应该让依赖注入容器处理上下文实例。\n对象名称:'DataContext'。
这是使用侦听和获取传入 Azure 消息的方法的完整服务。错误在 MessageHandler() 的最后一行
仅供参考- 如果我删除了 DB 调用中的“等待”,对于已处置的上下文,我仍然会收到相同的错误。
问题- 我该如何解决这个问题?
public class ServiceBusConsumer : IServiceBusConsumer
{
private readonly IConfiguration _config;
private readonly ServiceBusClient _queueClient;
private readonly ServiceBusProcessor _processor;
private readonly IUnitOfWork _unitOfWork;
private readonly IEventConsumer _eventConsumer;
public ServiceBusConsumer(IConfiguration config, IEventConsumer eventConsumer, IUnitOfWork unitOfWork)
{
_config = config;
_unitOfWork = unitOfWork;
_eventConsumer = eventConsumer;
_queueClient = new ServiceBusClient(_config["ServiceBus:Connection"]);
_processor = _queueClient.CreateProcessor(_config["ServiceBus:Queue"], new ServiceBusProcessorOptions());
}
public void RegisterOnMessageHandlerAndReceiveMessages() {
_processor.ProcessMessageAsync += MessageHandler;
_processor.ProcessErrorAsync += ErrorHandler;
_processor.StartProcessingAsync();
}
private async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
JObject jsonObject = JObject.Parse(body);
var eventStatus = (string)jsonObject["EventStatus"];
await args.CompleteMessageAsync(args.Message);
var spec = new YogabandEventWithMessageIdSpecification(args.Message.SequenceNumber);
// error here...
var ybEvent = await _unitOfWork.Repository<YogabandEvent>().GetEntityWithSpec(spec);
// do something then save
}
private Task ErrorHandler(ProcessErrorEventArgs args)
{
var error = args.Exception.ToString();
return Task.CompletedTask;
}
}
这是我的工作单位
public IGenericRepository<TEntity> Repository<TEntity>() where TEntity : class // : BaseEntity
{
if(_repositories == null)
_repositories = new Hashtable();
var type = typeof(TEntity).Name;
if (!_repositories.ContainsKey(type))
{
var repositoryType = typeof(GenericRepository<>);
var repositoryInstance = Activator.CreateInstance(repositoryType.MakeGenericType(typeof(TEntity)), _context);
_repositories.Add(type, repositoryInstance);
}
return (IGenericRepository<TEntity>) _repositories[type];
}
我试图直接在处理程序中调用我的通用存储库,但仍然失败并出现 dispose 错误。
这是我在处理程序中更改的调用,现在我调用 gen repo 而不是工作单元
var ybEvent = await _eventsRepo.GetEntityWithSpec(spec);
这是我的通用仓库中的 GetEntityWIthSpec()
public async Task<T> GetEntityWithSpec(ISpecification<T> spec)
{
return await ApplySpecification(spec).FirstOrDefaultAsync();
}
private IQueryable<T> ApplySpecification(ISpecification<T> spec)
{
return SpecificationEvaluator<T>.GetQuery(_context.Set<T>().AsQueryable(), spec);
}
仅供参考-这是我启动回购电话的方式
private readonly IGenericRepository<YogabandEvent> _eventsRepo;
then I inject it into the constructor
public ServiceBusConsumer(IConfiguration config, IEventConsumer eventConsumer, IUnitOfWork unitOfWork, IGenericRepository<YogabandEvent> eventsRepo)
{
_config = config;
_unitOfWork = unitOfWork;
_eventConsumer = eventConsumer;
_eventsRepo = eventsRepo;
_queueClient = new ServiceBusClient(_config["ServiceBus:Connection"]);
_processor = _queueClient.CreateProcessor(_config["ServiceBus:Queue"], new ServiceBusProcessorOptions());
}
启动 ServiceBusConsumer 的代码位于 Main()
public static async Task Main(string[] args)
{
var host = CreateHostBuilder(args).Build();
using (var scope = host.Services.CreateScope())
{
var services = scope.ServiceProvider;
var loggerFactory = services.GetRequiredService<ILoggerFactory>();
try
{
// do some work here
// https://stackoverflow.com/questions/48590579/cannot-resolve-scoped-service-from-root-provider-net-core-2
var bus = services.GetRequiredService<IServiceBusConsumer>();
bus.RegisterOnMessageHandlerAndReceiveMessages();
}
catch (Exception ex)
{
var logger = loggerFactory.CreateLogger<Program>();
logger.LogError(ex, "An error occured during migration");
}
}
host.Run();
}
这是我的工作单位
public class UnitOfWork : IUnitOfWork
{
private readonly DataContext _context;
private Hashtable _repositories;
public UnitOfWork(DataContext context)
{
_context = context;
}
public async Task<int> Complete()
{
return await _context.SaveChangesAsync();
}
public void Dispose()
{
_context.Dispose();
}
public IGenericRepository<TEntity> Repository<TEntity>() where TEntity : class // : BaseEntity
{
if(_repositories == null)
_repositories = new Hashtable();
var type = typeof(TEntity).Name;
if (!_repositories.ContainsKey(type))
{
var repositoryType = typeof(GenericRepository<>);
var repositoryInstance = Activator.CreateInstance(repositoryType.MakeGenericType(typeof(TEntity)), _context);
_repositories.Add(type, repositoryInstance);
}
return (IGenericRepository<TEntity>) _repositories[type];
}
}
解决方案
我无法验证这一点,但我遇到了类似的情况,即创建对服务的响应调用实际上处理了我的 UoW 和 datacontext,我遇到了同样的错误
我怀疑这个调用 await args.CompleteMessageAsync(args.Message);
是在两行之间的某处进行处理,(你可以继续在这里跟踪 CompleteMessageAsync调用
这个,并且正在进行很多处理)
为了验证这一点,您可以尝试将该调用推迟到使用存储库保存更改之后。
// await args.CompleteMessageAsync(args.Message); <-- comment this line
var spec = new YogabandEventWithMessageIdSpecification(args.Message.SequenceNumber);
// error here...
var ybEvent = await _unitOfWork.Repository<YogabandEvent>().GetEntityWithSpec(spec);
// do something then save
await args.CompleteMessageAsync(args.Message); // <-- add it here
推荐阅读
- maven - Fail when deploying imported mavenized application to Myeclipse
- flutter - 如何在 Flutter 应用中使用自定义图像
- c - avr-gcc:调用 ISR 后跳转到任意地址
- linux - How to use exec syscall in linux assembler
- java - Java json get value from directory
- python - Search google contact using People Api v3?
- python - What is the little box with an A in pyqtgraph/PyQt Viewbox with a little box and an A inside and how to get rid of it?
- relational-algebra - How to select multiple attributes along with grouping operation in Relational Algebra Queries?
- wordpress - WordPress website's index.php show unusual code
- function - Creating an automatic and self-generating weekday data entry sheet in google sheets