首页 > 解决方案 > 将 SignalR 集线器与 Redis 一起使用时如何跨负载均衡器服务器跨越 ConcurrentDictionary

问题描述

我有使用 Redis 横向扩展 SignalR 的 ASP.NET Core Web 应用程序设置。使用内置组可以正常工作:

Clients.Group("Group_Name");

并在多个负载均衡器中幸存下来。我假设 SignalR 会自动将这些组保存在 Redis 中,因此所有服务器都知道我们拥有哪些组以及订阅了哪些组。

但是,在我的情况下,我不能只依赖组(或用户),因为没有办法将 connectionId(比如重载 OnDisconnectedAsync 并且只有连接 id 已知)映射回其组,你总是需要Group_Name 来标识组。我需要它来确定组的哪一部分在线,所以当OnDisconnectedAsync被调用时,我知道这个人属于哪个组,以及他在对话的哪一边。

我做了一些研究,他们都建议(包括 Microsoft Docs)使用类似的东西:

static readonly ConcurrentDictionary<string, ConversationInformation> connectionMaps;

在集线器本身。

现在,这是一个很好的解决方案(并且是线程安全的),除了它只存在于负载平衡器服务器的一个内存中,而其他服务器具有该字典的不同实例。

问题是,我必须connectionMaps手动坚持吗?以 Redis 为例?

就像是:

public class ChatHub : Hub
{
    static readonly ConcurrentDictionary<string, ConversationInformation> connectionMaps;

    ChatHub(IDistributedCache distributedCache)
    {

        connectionMaps = distributedCache.Get("ConnectionMaps");
       /// I think connectionMaps should not be static any more.
    }
}

如果是,它是线程安全的吗?如果没有,您能否提出一个更好的负载平衡解决方案?

标签: asp.net-coreredissignalrasp.net-core-signalrconcurrentdictionary

解决方案


为此一直在与同样的问题作斗争。我想出的是在 redis 缓存中保留集合,同时利用 StackExchange.Redis.IDatabaseAsync 和锁来处理并发。不幸的是,这使整个过程同步,但无法完全解决这个问题。

这是我正在做的事情的核心,这会获得一个锁并从缓存中返回一个反序列化的集合


    private async Task<ConcurrentDictionary<int, HubMedia>> GetMediaAttributes(bool requireLock)
        {
            if(requireLock)
            {
                var retryTime = 0;
                try
                {
                    while (!await _redisDatabase.LockTakeAsync(_mediaAttributesLock, _lockValue, _defaultLockDuration))
                    {
                        //wait till we can get a lock on the data, 100ms by default
                        await Task.Delay(100);
                        retryTime += 10;
                        if (retryTime > _defaultLockDuration.TotalMilliseconds)
                        {
                            _logger.LogError("Failed to get Media Attributes");
                            return null;
                        }
                    }
                }
                catch(TaskCanceledException e)
                {
                    _logger.LogError("Failed to take lock within the default 5 second wait time " + e);
                    return null;
                }

            }
            var mediaAttributes = await _redisDatabase.StringGetAsync(MEDIA_ATTRIBUTES_LIST);
            if (!mediaAttributes.HasValue)
            {
                return new ConcurrentDictionary<int, HubMedia>();
            }
            return JsonConvert.DeserializeObject<ConcurrentDictionary<int, HubMedia>>(mediaAttributes);
        }

在我完成操作之后像这样更新集合

        private async Task<bool> UpdateCollection(string redisCollectionKey, object collection, string lockKey)
        {
            var success = false;
            try
            {
                success = await _redisDatabase.StringSetAsync(redisCollectionKey, JsonConvert.SerializeObject(collection, new JsonSerializerSettings
                {
                    ReferenceLoopHandling = ReferenceLoopHandling.Ignore
                }));
            }
            finally
            {
                await _redisDatabase.LockReleaseAsync(lockKey, _lockValue);
            }
            return success;
        }

当我完成后,我只需确保释放锁以供其他实例抓取和使用


private async Task ReleaseLock(string lockKey)
        {
            await _redisDatabase.LockReleaseAsync(lockKey, _lockValue);
        }

如果您找到更好的方法,将很高兴听到。努力寻找任何有关数据保留和共享的横向扩展文档。


推荐阅读