c# - 从 ConcurrentDictionary 调用异步回调 - 潜在的死锁?
问题描述
我有一个并发字典,用于管理订阅(以及与其他代码的互连),为了这个示例,显示了一个简化版本。
当添加或删除主题时,我需要更新外部消息总线,而不是添加额外的同步机制并围绕字典添加和消息主题添加订阅包装一个阻塞部分,我使用并发字典并在项目添加成功:
class SubscriptionManager
{
readonly ConcurrentDictionary<string, SubscribedTypes> subscriptions = new ConcurrentDictionary<string, SubscribedTypes>();
public void AddDynamic(string topic, Type type, Action<string> dynamicCallback)
{
subscriptions
.AddOrUpdate(
topic,
(topic) =>
{
var subscribedTypes = new SubscribedTypes(qos);
/* do some work and checks */
dynamicCallback(topic); // call back the calling code
return subscribedTypes;
},
(topic, subscribedTypes) =>
{
/* snip */
subscribedTypes.Add(type);
return subscribedTypes;
});
}
}
我的消息总线是纯异步的,客户端会调用如下:
subscriptionManager.AddDynamic("v1/health", HealthMessage, async (topic) => await messageBus.Subscribe(topic));
原始问题的更新
匿名 lambda 甚至被称为异步函数调用吗?回答 - 这不是一个好主意。消息总线可以被释放,所以要么通过单例的 IoC 生命周期管理来保证,要么在消息总线内实现 SubscriptionManager 实例
性能在这里至关重要,因为在审查时,一些应用程序会动态创建大量订阅,并且字典将被连续读取。
因此,我设置了一个 ReaderWriterLockSlim 锁,以最大限度地减少读取锁定(升级为写入完全锁定),并且当需要订阅消息总线时,它在临界区之外完成。
public class SubscriptionManager
{
// SubscribedTypes is a dictionary with some additional features
private readonly Dictionary<string, SubscribedTypes> dict = new Dictionary<string, SubscribedTypes>();
private readonly ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();
private readonly IMessageBus messageBus;
SubscriptionManager(IMessageBus messageBus)
{
this.messageBus = messageBus;
}
public async Task AddDynamicAsync(string topic, Type type)
{
var subscriptionRequired = false;
WriteLock(() => {
if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
{
// just add - no need to subscribe, is already subscribed when created for first time
value.DynamicRegistrations.Add(handlerType);
}
else
{
// this topic is new, add it
var subscribedTypes = new SubscribedTypes(qos);
subscribedTypes.DynamicRegistrations.Add(handlerType);
this.dict[topic] = subscribedTypes;
// we will need to register this on external bus
subscriptionRequired = true;
}
});
if(subscriptionRequired)
await this.messageBus.Subscribe(topic);
}
public async Task RemoveDynamicAsync(string topic, Type type)
{
var unsubscribeRequired = false;
WriteLock(() => {
if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
{
value.DynamicRegistrations.Remove(handlerType);
unsubscribeRequired = true;
}
else
{
throw new InvalidOperationException($"topic '{topic}' not registered.");
}
});
if(unsubscribeRequired)
await this.messageBus.Unsubscribe(topic);
}
public IEnumerable<Type> GetTypesForTopic(string topic)
{
var found = ReadLock(() => {
if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
return value.GetAll();
else
return new List<Type>();
});
}
private SubscribedTypes ReadLock(Func<SubscribedTypes> func)
{
rwLock.EnterReadLock();
try { return func(); }
finally { rwLock.ExitReadLock(); }
}
private void WriteLock(Action action)
{
rwLock.EnterWriteLock();
try { action(); }
finally { rwLock.ExitWriteLock(); }
}
}
解决方案
您的第一种方法ConcurrentDictionary
是有问题的,因为根据文档:
如果你
AddOrUpdate
在不同的线程上同时调用,addValueFactory
可能会被调用多次,但它的键/值对可能不会每次调用都添加到字典中。[...]
addValueFactory
和updateValueFactory
委托在锁外被调用,以避免在锁下执行未知代码可能出现的问题。因此,AddOrUpdate
对于ConcurrentDictionary<TKey,TValue>
类上的所有其他操作,它不是原子的。
您使用 的方式ConcurrentDictionary
表明您希望每次调用 提供的 lambdas 最多执行一次AddOrUpdate
,但这不能保证。
您使用普通Dictionary
+的第二种方法lock
将无法编译,因为您不允许在 lock block 内等待。您必须要么移动到await messageBus.Subscribe(topic);
受保护区域的外部,要么使用像SemaphoreSlim
.
推荐阅读
- jquery - 从排序列表的第一个元素中获取值
- javascript - 按顺序运行类实例的方式(typescript nodejs)
- javascript - 选择文件时Javascript调整图像大小并上传新图像而不是原始图像
- flutter - 如何从小部件树中的多个小部件中重绘特定小部件
- c++ - 在 VS Code 调试器中更好地打印二维数组 - C++
- git - 无法切换到源树中的分支?
- python - Cython 已安装但无法导入
- verilog - 设计有效/就绪握手的方法
- azure-stream-analytics - 如何在流分析中检查 JSON 属性中的空值?
- php - 当他不应该出现404错误时:laravel