asp.net-core - 将 SignalR 集线器与 Redis 一起使用时如何跨负载均衡器服务器跨越 ConcurrentDictionary
问题描述
我有使用 Redis 横向扩展 SignalR 的 ASP.NET Core Web 应用程序设置。使用内置组可以正常工作:
Clients.Group("Group_Name");
并在多个负载均衡器中幸存下来。我假设 SignalR 会自动将这些组保存在 Redis 中,因此所有服务器都知道我们拥有哪些组以及订阅了哪些组。
但是,在我的情况下,我不能只依赖组(或用户),因为没有办法将 connectionId(比如重载 OnDisconnectedAsync 并且只有连接 id 已知)映射回其组,你总是需要Group_Name 来标识组。我需要它来确定组的哪一部分在线,所以当OnDisconnectedAsync
被调用时,我知道这个人属于哪个组,以及他在对话的哪一边。
我做了一些研究,他们都建议(包括 Microsoft Docs)使用类似的东西:
static readonly ConcurrentDictionary<string, ConversationInformation> connectionMaps;
在集线器本身。
现在,这是一个很好的解决方案(并且是线程安全的),除了它只存在于负载平衡器服务器的一个内存中,而其他服务器具有该字典的不同实例。
问题是,我必须connectionMaps
手动坚持吗?以 Redis 为例?
就像是:
public class ChatHub : Hub
{
static readonly ConcurrentDictionary<string, ConversationInformation> connectionMaps;
ChatHub(IDistributedCache distributedCache)
{
connectionMaps = distributedCache.Get("ConnectionMaps");
/// I think connectionMaps should not be static any more.
}
}
如果是,它是线程安全的吗?如果没有,您能否提出一个更好的负载平衡解决方案?
解决方案
为此一直在与同样的问题作斗争。我想出的是在 redis 缓存中保留集合,同时利用 StackExchange.Redis.IDatabaseAsync 和锁来处理并发。不幸的是,这使整个过程同步,但无法完全解决这个问题。
这是我正在做的事情的核心,这会获得一个锁并从缓存中返回一个反序列化的集合
private async Task<ConcurrentDictionary<int, HubMedia>> GetMediaAttributes(bool requireLock)
{
if(requireLock)
{
var retryTime = 0;
try
{
while (!await _redisDatabase.LockTakeAsync(_mediaAttributesLock, _lockValue, _defaultLockDuration))
{
//wait till we can get a lock on the data, 100ms by default
await Task.Delay(100);
retryTime += 10;
if (retryTime > _defaultLockDuration.TotalMilliseconds)
{
_logger.LogError("Failed to get Media Attributes");
return null;
}
}
}
catch(TaskCanceledException e)
{
_logger.LogError("Failed to take lock within the default 5 second wait time " + e);
return null;
}
}
var mediaAttributes = await _redisDatabase.StringGetAsync(MEDIA_ATTRIBUTES_LIST);
if (!mediaAttributes.HasValue)
{
return new ConcurrentDictionary<int, HubMedia>();
}
return JsonConvert.DeserializeObject<ConcurrentDictionary<int, HubMedia>>(mediaAttributes);
}
在我完成操作之后像这样更新集合
private async Task<bool> UpdateCollection(string redisCollectionKey, object collection, string lockKey)
{
var success = false;
try
{
success = await _redisDatabase.StringSetAsync(redisCollectionKey, JsonConvert.SerializeObject(collection, new JsonSerializerSettings
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore
}));
}
finally
{
await _redisDatabase.LockReleaseAsync(lockKey, _lockValue);
}
return success;
}
当我完成后,我只需确保释放锁以供其他实例抓取和使用
private async Task ReleaseLock(string lockKey)
{
await _redisDatabase.LockReleaseAsync(lockKey, _lockValue);
}
如果您找到更好的方法,将很高兴听到。努力寻找任何有关数据保留和共享的横向扩展文档。
推荐阅读
- arrays - Groovy 检查数组包含字符串与文字字符串和连接字符串的工作方式不同
- batch-file - 如何使用 bat 在命令提示符下执行 2 个命令目录行。文件
- python - sqlalchemy:防止关系对象自动添加到会话中
- elasticsearch - 如何在 elasticsearch 上运行自定义 lucene 编解码器?
- android - 如何使网络浏览器可以访问android应用程序
- c# - Github 项目上的错误-“值不能为空”、“未设置内核索引 (1) 处的属性 (particleBuffer)”
- android - 无法应用短信中的上下文
- node.js - 正文解析器不使用 POST-MAN 表单数据
- python - 如何从 Google 新闻 RSS 中抓取 Google 新闻文章内容?
- python-2.7 - 让 scipy 接受丢失的数据