首页 > 解决方案 > 从 Masstransit 中的消息上下文获取故障消息的问题

问题描述

我有一个应用程序需要拦截当前消息使用上下文并提取在基本接口中定义的值。该值是最终在 EF 数据库上下文中使用的租户代码。

我有一个提供者,它采用 MassTransit ConsumerContext,然后使用context.TryGetMessage()提取租户代码,最终用于将数据库上下文切换到特定租户数据库。

问题在于下面的 MessageContextTenantProvider。如果使用了非故障消息,则ConsumeContext<IBaseEvent>可以正常工作。但是,如果是故障,ConsumeContext<Fault<IBaseEvent>>则无法按预期工作。

在调试期间,我可以看到错误的消息上下文是ConsumeContext<Fault<IVerifyEvent>>,但为什么它不能按照标准消息与基本接口一起使用?当然,ConsumeContext<Fault<IVerifiedEvent>>工作正常,但我有很多消息类型,我不想在那个租户提供者中定义它们。

有任何想法吗?

public interface ITenantProvider
{
    string GetTenantCode();
}

public class MessageContextTenantProvider : ITenantProvider
{
    private readonly ConsumeContext _consumeContext;

    public MessageContextTenantProvider(ConsumeContext consumeContext)
    {
        _consumeContext = consumeContext;
    }

    public string GetTenantCode()
    {
        // get tenant from message context
        if (_consumeContext.TryGetMessage(out ConsumeContext<IBaseEvent> baseEvent))
        {
            return baseEvent.Message.TenantCode; // <-- works for the non fault consumers
        }

        // get tenant from fault message context
        if (_consumeContext.TryGetMessage<Fault<IBaseEvent>>(out var gebericFaultEvent))
        {
            return gebericFaultEvent.Message.Message.TenantCode; // <- doesn't work generically
        }

        // get tenant from fault message context (same as above)
        if (_consumeContext.TryGetMessage(out ConsumeContext<Fault<IBaseEvent>> faultEvent))
        {
            return faultEvent.Message.Message.TenantCode; // <= generically doesn't work when using the base interface?
        }

        // get tenant from specific concrete fault class
        if (_consumeContext.TryGetMessage(out ConsumeContext<Fault<IVerifiedEvent>> verifiedFaultEvent))
        {
            return verifiedFaultEvent.Message.Message.TenantCode; // <-- this works
        }

        // not able to extract tenant
        return null;
    }
}

public partial class VerificationDbContext
{
    string connectionString;

    public string ConnectionString
    {
        get
        {
            if (connectionString == null)
            {
                string tenantCode = _tenantProvider.GetTenantCode();
                connectionString = _tenantConnectionManager.GetConnectionString(orgId);
            }
            return connectionString;
        }
    }

    private readonly ITenantProvider _tenantProvider;

    private readonly ITenantConnectionManager _tenantConnectionManager;

    public VerificationDbContext(ITenantProvider tenantProvider, ITenantConnectionManager tenantConnectionManager)
    {
        _tenantProvider = tenantProvider;
        _tenantConnectionManager = tenantConnectionManager;
    }

    protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
    {
        if (string.IsNullOrEmpty(this.ConnectionString))
        {
            optionsBuilder.UseSqlServer(@"Data Source=.\SQLEXPRESS;Initial Catalog=VerificationDb;Integrated Security=True")
                .ConfigureWarnings((warningBuilder) => warningBuilder.Ignore(RelationalEventId.AmbientTransactionWarning));
        }
        else
        {
            optionsBuilder.UseSqlServer(this.ConnectionString)
                .ConfigureWarnings((warningBuilder) => warningBuilder.Ignore(RelationalEventId.AmbientTransactionWarning));
        }
    }
}

public interface ITenantConnectionManager
{
    string GetConnectionString(string tenantCode);
}

public class TenantConnectionManager : ITenantConnectionManager
{
    private ITenantRepository _tenantRepository;

    public TenantConnectionManager(ITenantRepository tenantRepository)
    {
        _tenantRepository = tenantRepository;
    }

    public string GetConnectionString(string tenantCode)
    {
        return _tenantRepository.GetByTenantCode(tenantCode).ConnectionString;
    }
}

public interface IBaseEvent
{
    string TenantCode { get; }
}

public interface IVerifiedEvent : IBaseEvent
{
    string JobReference { get; }
}

public class VerifiedEventConsumer : IConsumer<IVerifiedEvent>
{
    private readonly IVerifyCommand _verifyCommand;

    private readonly ITenantProvider _tenantProvider;

    public VerifiedEventConsumer(ITenantProvider tenantProvider, IVerifyCommand verifyCommand)
    {
        _verifyCommand = verifyCommand;
        _tenantProvider = tenantProvider;
    }

    public async Task Consume(ConsumeContext<IVerifiedEvent> context)
    {
        await _verifyCommand.Execute(new VerifyRequest
        {
            JobReference = context.Message.JobReference,
            TenantCode = context.Message.TenantCode
        });
    }
}

public class VerifiedEventFaultConsumer : IConsumer<Fault<IVerifiedEvent>>
{
    private readonly IVerifyFaultCommand _verifyFaultCommand;

    private readonly ITenantProvider _tenantProvider;

    public CaseVerifiedEventFaultConsumer(ITenantProvider tenantProvider, IVerifyFaultCommand verifyFaultCommand)
    {
        _verifyFaultCommand = verifyFaultCommand;
        _tenantProvider = tenantProvider;
    }

    public async Task Consume(ConsumeContext<Fault<ICaseVerifiedEvent>> context)
    {
        await _verifyFaultCommand.Execute(new VerifiedFaultRequest
        {
            JobReference = context.Message.Message.JobReference,
            Exceptions = context.Message.Exceptions
        });
    }
}

标签: masstransit

解决方案


我已经通过使用 GreenPipes TryGetPayload 扩展方法解决了这个问题:

public class MessageContextTenantProvider : ITenantProvider
{
    private readonly ConsumeContext _consumeContext;

    public MessageContextTenantProvider(ConsumeContext consumeContext)
    {
        _consumeContext = consumeContext;
    }

    public string GetTenantCode()
    {
        // get tenant from message context
        if (_consumeContext.TryGetMessage(out ConsumeContext<IBaseEvent> baseEvent))
        {
            return baseEvent.Message.TenantCode;
        }

        // get account code from fault message context using Greenpipes
        if (_consumeContext.TryGetPayload(out ConsumeContext<Fault<IBaseEvent>> payloadFaultEvent))
        {
            return payloadFaultEvent.Message.Message.TenantCode;
        }

        // not able to extract tenant
        return null;
    }
}

推荐阅读