events - 从 RabbitMq 消息中获取租户 ID 以进行 Db 连接
问题描述
我有一个微服务架构,其中包含 ASP.Net Core 应用程序和 RabbitMq 作为微服务之间的事件总线。
我也想支持多租户。
因此,我在中定义了以下依赖注入服务,Startup.cs
以根据用户的租户 ID 在每个请求上打开与数据库的连接。
services.AddScoped<IDocumentSession>(ds =>
{
var store = ds.GetRequiredService<IDocumentStore>();
var httpContextAccessor = ds.GetRequiredService<IHttpContextAccessor>();
var tenant = httpContextAccessor?.HttpContext?.User?.Claims.FirstOrDefault(c => c.Type == "tid")?.Value;
return tenant != null ? store.OpenSession(tenant) : store.OpenSession();
});
问题在于服务何时处理事件总线消息(如 UserUpdatedEvent)。
在那种情况下,当它尝试打开 Db 连接时,它显然没有来自 http 上下文的用户信息。
在注入作用域服务并使用 RabbitMq 处理事件时,如何发送/访问相应用户的租户 ID?
或者改写我的问题:执行依赖注入代码时,有什么方法可以访问 RabbitMQ 消息(例如它的标头)?
解决方案
既然没有HttpContext
,因为 RabbitMq 请求不是 Http 请求,正如@istepaniuk 的回答中所指出的那样,我创建了自己的上下文并将其命名为AmqpContext
:
public interface IAmqpContext
{
void ClearHeaders();
void AddHeaders(IDictionary<string, object> headers);
string GetHeaderByKey(string headerKey);
}
public class AmqpContext : IAmqpContext
{
private readonly Dictionary<string, object> _headers;
public AmqpContext()
{
_headers = new Dictionary<string, object>();
}
public void ClearHeaders()
{
_headers.Clear();
}
public void AddHeaders(IDictionary<string, object> headers)
{
foreach (var header in headers)
_headers.Add(header.Key, header.Value);
}
public string GetHeaderByKey(string headerKey)
{
if (_headers.TryGetValue(headerKey, out object headerValue))
{
return Encoding.Default.GetString((byte[])headerValue);
}
return null;
}
}
在发送 RabbitMq 消息时,我通过如下标头发送租户 ID:
var properties = channel.CreateBasicProperties();
if (tenantId != null)
{
var headers = new Dictionary<string, object>
{
{ "tid", tenantId }
};
properties.Headers = headers;
}
channel.BasicPublish(exchange: BROKER_NAME,
routingKey: eventName,
mandatory: true,
basicProperties: properties,
body: body);
然后,当在接收服务上,我将其注册AmqpContext
为范围服务Startup.cs
:
services.AddScoped<IAmqpContext, AmqpContext>();
当接收到 RabbitMq 消息时,在消费者通道中,一个范围和 Amqp 上下文被创建:
consumer.Received += async (model, ea) =>
{
var eventName = ea.RoutingKey;
var message = Encoding.UTF8.GetString(ea.Body);
var properties = ea.BasicProperties;
using (var scope = _serviceProvider.CreateScope())
{
var amqpContext = scope.ServiceProvider.GetService<IAmqpContext>();
if (amqpContext != null)
{
amqpContext.ClearHeaders();
if (properties.Headers != null && amqpContext != null)
{
amqpContext.AddHeaders(properties.Headers);
}
}
var handler = scope.ServiceProvider.GetService(subscription.HandlerType);
if (handler == null) continue;
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
}
channel.BasicAck(ea.DeliveryTag, multiple: false);
};
然后,当创建作用域 Db 连接服务时(请参阅我的问题),我可以从消息标头访问租户 ID:
services.AddScoped<IDocumentSession>(ds =>
{
var store = ds.GetRequiredService<IDocumentStore>();
string tenant = null;
var httpContextAccessor = ds.GetRequiredService<IHttpContextAccessor>();
if (httpContextAccessor.HttpContext != null)
{
tenant = httpContextAccessor.HttpContext.User?.Claims.FirstOrDefault(c => c.Type == "tid")?.Value;
}
else
{
var amqpContext = ds.GetRequiredService<IAmqpContext>();
tenant = amqpContext.GetHeaderByKey("tid");
}
return tenant != null ? store.OpenSession(tenant) : store.OpenSession();
});
推荐阅读
- uitableview - iOS13 表格视图控制器顶部有超大填充的问题
- gitlab - Gitlab项目基于模板成员为空
- c# - 如何将 ASP.NET MVC 视图迁移到 Angular UI
- jquery - 如何从多选列表框中删除“全选”选项
- android - 像使用数据库一样使用 .txt 文件
- amazon-web-services - 与数据传输相关的 AWS 账单问题
- vue.js - 单击vuetify中的按钮时如何显示日期选择器?
- javascript - 模态组件使用
- python - 确认这种使用神经网络处理缺失值的方法
- ruby-on-rails - Rails_admin 一键删除几个项目