首页 > 解决方案 > 在使用 Azure 服务总线消息后尝试连接到我的数据库时处理上下文实例错误

问题描述

我正在侦听传入的 Azure 服务总线消息。按照文档并接收消息,我解析消息正文,然后我想连接到我的数据库以编辑条目然后保存。但是我在尝试拨打电话时在下面收到此错误

var ybEvent = await _unitOfWork.Repository<YogabandEvent>().GetEntityWithSpec(spec);

错误

无法访问已释放的上下文实例。此错误的一个常见原因是释放从依赖注入中解析的上下文实例,然后尝试在应用程序的其他地方使用相同的上下文实例。如果您在上下文实例上调用“Dispose”或将其包装在 using 语句中,则可能会发生这种情况。如果你使用依赖注入,你应该让依赖注入容器处理上下文实例。\n对象名称:'DataContext'。

这是使用侦听和获取传入 Azure 消息的方法的完整服务。错误在 MessageHandler() 的最后一行

仅供参考- 如果我删除了 DB 调用中的“等待”,对于已处置的上下文,我仍然会收到相同的错误。

问题- 我该如何解决这个问题?

public class ServiceBusConsumer : IServiceBusConsumer
{
    private readonly IConfiguration _config;
    private readonly ServiceBusClient _queueClient;
    private readonly ServiceBusProcessor _processor;
    private readonly IUnitOfWork _unitOfWork;
    private readonly IEventConsumer _eventConsumer;

    public ServiceBusConsumer(IConfiguration config, IEventConsumer eventConsumer, IUnitOfWork unitOfWork)
    {
        _config = config;
        _unitOfWork = unitOfWork;
        _eventConsumer = eventConsumer;
        _queueClient = new ServiceBusClient(_config["ServiceBus:Connection"]);
        _processor = _queueClient.CreateProcessor(_config["ServiceBus:Queue"], new ServiceBusProcessorOptions());
    }

    public void RegisterOnMessageHandlerAndReceiveMessages() {
        _processor.ProcessMessageAsync += MessageHandler;
        _processor.ProcessErrorAsync += ErrorHandler;
        _processor.StartProcessingAsync();
    }

    private async Task MessageHandler(ProcessMessageEventArgs args)
    {
        string body = args.Message.Body.ToString();
        JObject jsonObject = JObject.Parse(body);
        var eventStatus = (string)jsonObject["EventStatus"];

        await args.CompleteMessageAsync(args.Message);
        
        var spec = new YogabandEventWithMessageIdSpecification(args.Message.SequenceNumber);
        // error here...
        var ybEvent =  await _unitOfWork.Repository<YogabandEvent>().GetEntityWithSpec(spec);

        // do something then save
    }

    private Task ErrorHandler(ProcessErrorEventArgs args)
    {
        var error = args.Exception.ToString();
        return Task.CompletedTask;
    }
}

这是我的工作单位

public IGenericRepository<TEntity> Repository<TEntity>() where TEntity : class // : BaseEntity
    {
        if(_repositories == null) 
            _repositories = new Hashtable();

        var type = typeof(TEntity).Name;

        if (!_repositories.ContainsKey(type))
        {
            var repositoryType = typeof(GenericRepository<>);
            var repositoryInstance = Activator.CreateInstance(repositoryType.MakeGenericType(typeof(TEntity)), _context);

            _repositories.Add(type, repositoryInstance);
        }

        return (IGenericRepository<TEntity>) _repositories[type];
    }

我试图直接在处理程序中调用我的通用存储库,但仍然失败并出现 dispose 错误。

这是我在处理程序中更改的调用,现在我调用 gen repo 而不是工作单元

var ybEvent = await _eventsRepo.GetEntityWithSpec(spec);

这是我的通用仓库中的 GetEntityWIthSpec()

public async Task<T> GetEntityWithSpec(ISpecification<T> spec)
{
    return await ApplySpecification(spec).FirstOrDefaultAsync();
}
private IQueryable<T> ApplySpecification(ISpecification<T> spec)
    {
        return SpecificationEvaluator<T>.GetQuery(_context.Set<T>().AsQueryable(), spec);
    }

仅供参考-这是我启动回购电话的方式

private readonly IGenericRepository<YogabandEvent> _eventsRepo;

then I inject it into the constructor

public ServiceBusConsumer(IConfiguration config, IEventConsumer eventConsumer, IUnitOfWork unitOfWork, IGenericRepository<YogabandEvent> eventsRepo)
    {
        _config = config;
        _unitOfWork = unitOfWork;
        _eventConsumer = eventConsumer;
        _eventsRepo = eventsRepo;
        _queueClient = new ServiceBusClient(_config["ServiceBus:Connection"]);
        _processor = _queueClient.CreateProcessor(_config["ServiceBus:Queue"], new ServiceBusProcessorOptions());
    }

启动 ServiceBusConsumer 的代码位于 Main()

public static async Task Main(string[] args)
    {
        var host = CreateHostBuilder(args).Build();
        using (var scope = host.Services.CreateScope())
        {
            var services = scope.ServiceProvider;
            var loggerFactory = services.GetRequiredService<ILoggerFactory>();
            try 
            {

                // do some work here


                // https://stackoverflow.com/questions/48590579/cannot-resolve-scoped-service-from-root-provider-net-core-2
                var bus = services.GetRequiredService<IServiceBusConsumer>();
                bus.RegisterOnMessageHandlerAndReceiveMessages();
                
            }
            catch (Exception ex)
            {
                var logger = loggerFactory.CreateLogger<Program>();
                logger.LogError(ex, "An error occured during migration");
            }
        }

        host.Run();
    }

这是我的工作单位

public class UnitOfWork : IUnitOfWork
{
    private readonly DataContext _context;
    private Hashtable _repositories;

    public UnitOfWork(DataContext context)
    {
        _context = context;
    }

    public async Task<int> Complete()
    {
        return await _context.SaveChangesAsync();
    }

    public void Dispose()
    {
        _context.Dispose();
    }

    public IGenericRepository<TEntity> Repository<TEntity>() where TEntity : class // : BaseEntity
    {
        if(_repositories == null) 
            _repositories = new Hashtable();

        var type = typeof(TEntity).Name;

        if (!_repositories.ContainsKey(type))
        {
            var repositoryType = typeof(GenericRepository<>);
            var repositoryInstance = Activator.CreateInstance(repositoryType.MakeGenericType(typeof(TEntity)), _context);

            _repositories.Add(type, repositoryInstance);
        }

        return (IGenericRepository<TEntity>) _repositories[type];
    }
}

标签: c#.net-coreentity-framework-coreobjectdisposedexception

解决方案


我无法验证这一点,但我遇到了类似的情况,即创建对服务的响应调用实际上处理了我的 UoW 和 datacontext,我遇到了同样的错误

我怀疑这个调​​用 await args.CompleteMessageAsync(args.Message);是在两行之间的某处进行处理,(你可以继续在这里跟踪 CompleteMessageAsync调用 这个,并且正在进行很多处理)

为了验证这一点,您可以尝试将该调用推迟到使用存储库保存更改之后。

    // await args.CompleteMessageAsync(args.Message); <-- comment this line
    
    var spec = new YogabandEventWithMessageIdSpecification(args.Message.SequenceNumber);
    // error here...
    var ybEvent =  await _unitOfWork.Repository<YogabandEvent>().GetEntityWithSpec(spec);

    // do something then save
    await args.CompleteMessageAsync(args.Message); // <-- add it here

推荐阅读