masstransit - 从 Masstransit 中的消息上下文获取故障消息的问题
问题描述
我有一个应用程序需要拦截当前消息使用上下文并提取在基本接口中定义的值。该值是最终在 EF 数据库上下文中使用的租户代码。
我有一个提供者,它采用 MassTransit ConsumerContext,然后使用context.TryGetMessage()
提取租户代码,最终用于将数据库上下文切换到特定租户数据库。
问题在于下面的 MessageContextTenantProvider。如果使用了非故障消息,则ConsumeContext<IBaseEvent>
可以正常工作。但是,如果是故障,ConsumeContext<Fault<IBaseEvent>>
则无法按预期工作。
在调试期间,我可以看到错误的消息上下文是ConsumeContext<Fault<IVerifyEvent>>
,但为什么它不能按照标准消息与基本接口一起使用?当然,ConsumeContext<Fault<IVerifiedEvent>>
工作正常,但我有很多消息类型,我不想在那个租户提供者中定义它们。
有任何想法吗?
public interface ITenantProvider
{
string GetTenantCode();
}
public class MessageContextTenantProvider : ITenantProvider
{
private readonly ConsumeContext _consumeContext;
public MessageContextTenantProvider(ConsumeContext consumeContext)
{
_consumeContext = consumeContext;
}
public string GetTenantCode()
{
// get tenant from message context
if (_consumeContext.TryGetMessage(out ConsumeContext<IBaseEvent> baseEvent))
{
return baseEvent.Message.TenantCode; // <-- works for the non fault consumers
}
// get tenant from fault message context
if (_consumeContext.TryGetMessage<Fault<IBaseEvent>>(out var gebericFaultEvent))
{
return gebericFaultEvent.Message.Message.TenantCode; // <- doesn't work generically
}
// get tenant from fault message context (same as above)
if (_consumeContext.TryGetMessage(out ConsumeContext<Fault<IBaseEvent>> faultEvent))
{
return faultEvent.Message.Message.TenantCode; // <= generically doesn't work when using the base interface?
}
// get tenant from specific concrete fault class
if (_consumeContext.TryGetMessage(out ConsumeContext<Fault<IVerifiedEvent>> verifiedFaultEvent))
{
return verifiedFaultEvent.Message.Message.TenantCode; // <-- this works
}
// not able to extract tenant
return null;
}
}
public partial class VerificationDbContext
{
string connectionString;
public string ConnectionString
{
get
{
if (connectionString == null)
{
string tenantCode = _tenantProvider.GetTenantCode();
connectionString = _tenantConnectionManager.GetConnectionString(orgId);
}
return connectionString;
}
}
private readonly ITenantProvider _tenantProvider;
private readonly ITenantConnectionManager _tenantConnectionManager;
public VerificationDbContext(ITenantProvider tenantProvider, ITenantConnectionManager tenantConnectionManager)
{
_tenantProvider = tenantProvider;
_tenantConnectionManager = tenantConnectionManager;
}
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
if (string.IsNullOrEmpty(this.ConnectionString))
{
optionsBuilder.UseSqlServer(@"Data Source=.\SQLEXPRESS;Initial Catalog=VerificationDb;Integrated Security=True")
.ConfigureWarnings((warningBuilder) => warningBuilder.Ignore(RelationalEventId.AmbientTransactionWarning));
}
else
{
optionsBuilder.UseSqlServer(this.ConnectionString)
.ConfigureWarnings((warningBuilder) => warningBuilder.Ignore(RelationalEventId.AmbientTransactionWarning));
}
}
}
public interface ITenantConnectionManager
{
string GetConnectionString(string tenantCode);
}
public class TenantConnectionManager : ITenantConnectionManager
{
private ITenantRepository _tenantRepository;
public TenantConnectionManager(ITenantRepository tenantRepository)
{
_tenantRepository = tenantRepository;
}
public string GetConnectionString(string tenantCode)
{
return _tenantRepository.GetByTenantCode(tenantCode).ConnectionString;
}
}
public interface IBaseEvent
{
string TenantCode { get; }
}
public interface IVerifiedEvent : IBaseEvent
{
string JobReference { get; }
}
public class VerifiedEventConsumer : IConsumer<IVerifiedEvent>
{
private readonly IVerifyCommand _verifyCommand;
private readonly ITenantProvider _tenantProvider;
public VerifiedEventConsumer(ITenantProvider tenantProvider, IVerifyCommand verifyCommand)
{
_verifyCommand = verifyCommand;
_tenantProvider = tenantProvider;
}
public async Task Consume(ConsumeContext<IVerifiedEvent> context)
{
await _verifyCommand.Execute(new VerifyRequest
{
JobReference = context.Message.JobReference,
TenantCode = context.Message.TenantCode
});
}
}
public class VerifiedEventFaultConsumer : IConsumer<Fault<IVerifiedEvent>>
{
private readonly IVerifyFaultCommand _verifyFaultCommand;
private readonly ITenantProvider _tenantProvider;
public CaseVerifiedEventFaultConsumer(ITenantProvider tenantProvider, IVerifyFaultCommand verifyFaultCommand)
{
_verifyFaultCommand = verifyFaultCommand;
_tenantProvider = tenantProvider;
}
public async Task Consume(ConsumeContext<Fault<ICaseVerifiedEvent>> context)
{
await _verifyFaultCommand.Execute(new VerifiedFaultRequest
{
JobReference = context.Message.Message.JobReference,
Exceptions = context.Message.Exceptions
});
}
}
解决方案
我已经通过使用 GreenPipes TryGetPayload 扩展方法解决了这个问题:
public class MessageContextTenantProvider : ITenantProvider
{
private readonly ConsumeContext _consumeContext;
public MessageContextTenantProvider(ConsumeContext consumeContext)
{
_consumeContext = consumeContext;
}
public string GetTenantCode()
{
// get tenant from message context
if (_consumeContext.TryGetMessage(out ConsumeContext<IBaseEvent> baseEvent))
{
return baseEvent.Message.TenantCode;
}
// get account code from fault message context using Greenpipes
if (_consumeContext.TryGetPayload(out ConsumeContext<Fault<IBaseEvent>> payloadFaultEvent))
{
return payloadFaultEvent.Message.Message.TenantCode;
}
// not able to extract tenant
return null;
}
}
推荐阅读
- amazon-web-services - 为第 3 方身份提供商创建 AWS IAM 策略/角色
- python - 在机器学习中有不可训练的参数可以吗?
- angular - 使用 ALT 键时,Angular HostListener 无法按预期工作
- reactjs - 将数据传递给具有不同数据结构但具有相同命名字段的 GatsbyJS 模板
- java - 错误:无法找到具有逻辑名称的列 - 未找到参考列
- html - WordPress:将对角飞溅横幅插入 OnePress Hero 部分
- python - pathlib.Path(path).mkdir(),不创建目录
- css - Luma 主题 Magento 2 和促销标签
- git - 包锁更改时更新节点模块
- javascript - 如果没有其他方法,如何比较不使用的时间