首页 > 解决方案 > 从 RabbitMq 消息中获取租户 ID 以进行 Db 连接

问题描述

我有一个微服务架构,其中包含 ASP.Net Core 应用程序和 RabbitMq 作为微服务之间的事件总线。
我也想支持多租户。
因此,我在中定义了以下依赖注入服务,Startup.cs以根据用户的租户 ID 在每个请求上打开与数据库的连接。

services.AddScoped<IDocumentSession>(ds =>
            {
                var store = ds.GetRequiredService<IDocumentStore>();
                var httpContextAccessor = ds.GetRequiredService<IHttpContextAccessor>();
                var tenant = httpContextAccessor?.HttpContext?.User?.Claims.FirstOrDefault(c => c.Type == "tid")?.Value;
                return tenant != null ? store.OpenSession(tenant) : store.OpenSession();
            });

问题在于服务何时处理事件总线消息(如 UserUpdatedEvent)。
在那种情况下,当它尝试打开 Db 连接时,它显然没有来自 http 上下文的用户信息。

在注入作用域服务并使用 RabbitMq 处理事件时,如何发送/访问相应用户的租户 ID?

或者改写我的问题:执行依赖注入代码时,有什么方法可以访问 RabbitMQ 消息(例如它的标头)?

标签: eventsasp.net-coredependency-injectionrabbitmqmulti-tenant

解决方案


既然没有HttpContext,因为 RabbitMq 请求不是 Http 请求,正如@istepaniuk 的回答中所指出的那样,我创建了自己的上下文并将其命名为AmqpContext

public interface IAmqpContext
    {
        void ClearHeaders();
        void AddHeaders(IDictionary<string, object> headers);
        string GetHeaderByKey(string headerKey);
    }

    public class AmqpContext : IAmqpContext
    {
        private readonly Dictionary<string, object> _headers;

        public AmqpContext()
        {
            _headers = new Dictionary<string, object>();
        }

        public void ClearHeaders()
        {
            _headers.Clear();
        }

        public void AddHeaders(IDictionary<string, object> headers)
        {
            foreach (var header in headers)
                _headers.Add(header.Key, header.Value);
        }

        public string GetHeaderByKey(string headerKey) 
        {
            if (_headers.TryGetValue(headerKey, out object headerValue))
            {
                return Encoding.Default.GetString((byte[])headerValue);
            }
            return null;
        }
    }

在发送 RabbitMq 消息时,我通过如下标头发送租户 ID:

                    var properties = channel.CreateBasicProperties();
                    if (tenantId != null)
                    {
                        var headers = new Dictionary<string, object>
                        {
                            { "tid", tenantId }
                        };
                        properties.Headers = headers;
                    }

                    channel.BasicPublish(exchange: BROKER_NAME,
                                     routingKey: eventName,
                                     mandatory: true,
                                     basicProperties: properties,
                                     body: body);

然后,当在接收服务上,我将其注册AmqpContext为范围服务Startup.cs

services.AddScoped<IAmqpContext, AmqpContext>();

当接收到 RabbitMq 消息时,在消费者通道中,一个范围和 Amqp 上下文被创建:

consumer.Received += async (model, ea) =>
            {
                var eventName = ea.RoutingKey;
                var message = Encoding.UTF8.GetString(ea.Body);
                var properties = ea.BasicProperties;

                using (var scope = _serviceProvider.CreateScope())
                        {
                            var amqpContext = scope.ServiceProvider.GetService<IAmqpContext>();
                            if (amqpContext != null)
                            {
                                amqpContext.ClearHeaders();
                                if (properties.Headers != null && amqpContext != null)
                                {
                                    amqpContext.AddHeaders(properties.Headers);
                                }
                            }
                            var handler = scope.ServiceProvider.GetService(subscription.HandlerType);
                            if (handler == null) continue;
                            var eventType = _subsManager.GetEventTypeByName(eventName);
                            var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                            var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                            await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                        }

                channel.BasicAck(ea.DeliveryTag, multiple: false);
            };

然后,当创建作用域 Db 连接服务时(请参阅我的问题),我可以从消息标头访问租户 ID:

    services.AddScoped<IDocumentSession>(ds =>
    {
        var store = ds.GetRequiredService<IDocumentStore>();
        string tenant = null;
        var httpContextAccessor = ds.GetRequiredService<IHttpContextAccessor>();
        if (httpContextAccessor.HttpContext != null)
        {
            tenant = httpContextAccessor.HttpContext.User?.Claims.FirstOrDefault(c => c.Type == "tid")?.Value;
        }
        else
        {
            var amqpContext = ds.GetRequiredService<IAmqpContext>();
            tenant = amqpContext.GetHeaderByKey("tid");
        }
        return tenant != null ? store.OpenSession(tenant) : store.OpenSession();
    });

推荐阅读