首页 > 解决方案 > 从 ConcurrentDictionary 调用异步回调 - 潜在的死锁?

问题描述

我有一个并发字典,用于管理订阅(以及与其他代码的互连),为了这个示例,显示了一个简化版本。

当添加或删除主题时,我需要更新外部消息总线,而不是添加额外的同步机制并围绕字典添加和消息主题添加订阅包装一个阻塞部分,我使用并发字典并在项目添加成功:

class SubscriptionManager
{
  readonly ConcurrentDictionary<string, SubscribedTypes> subscriptions = new ConcurrentDictionary<string, SubscribedTypes>();

   public void AddDynamic(string topic, Type type, Action<string> dynamicCallback)
   {
      subscriptions
          .AddOrUpdate(
                    topic,
                    (topic) =>
                    {
                        var subscribedTypes = new SubscribedTypes(qos);
                        /* do some work and checks */

                        dynamicCallback(topic); // call back the calling code

                        return subscribedTypes;
                    },
                    (topic, subscribedTypes) =>
                    {
                        /* snip */
                        subscribedTypes.Add(type);
                        return subscribedTypes;
                    });
   }
}

我的消息总线是纯异步的,客户端会调用如下:

subscriptionManager.AddDynamic("v1/health", HealthMessage, async (topic) => await messageBus.Subscribe(topic));

原始问题的更新

匿名 lambda 甚至被称为异步函数调用吗?回答 - 这不是一个好主意。消息总线可以被释放,所以要么通过单例的 IoC 生命周期管理来保证,要么在消息总线内实现 SubscriptionManager 实例

性能在这里至关重要,因为在审查时,一些应用程序会动态创建大量订阅,并且字典将被连续读取。

因此,我设置了一个 ReaderWriterLockSlim 锁,以最大限度地减少读取锁定(升级为写入完全锁定),并且当需要订阅消息总线时,它在临界区之外完成。

public class SubscriptionManager
{
    // SubscribedTypes is a dictionary with some additional features
    private readonly Dictionary<string, SubscribedTypes> dict = new Dictionary<string, SubscribedTypes>();
    private readonly ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();
    private readonly IMessageBus messageBus;
    
    SubscriptionManager(IMessageBus messageBus)
    {
        this.messageBus = messageBus;
    }

    public async Task AddDynamicAsync(string topic, Type type)
    {
        var subscriptionRequired = false;
        
        WriteLock(() => {
            if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
            {
                // just add - no need to subscribe, is already subscribed when created for first time
                value.DynamicRegistrations.Add(handlerType);                
            }
            else
            {
                // this topic is new, add it
                var subscribedTypes = new SubscribedTypes(qos);
                subscribedTypes.DynamicRegistrations.Add(handlerType);
                this.dict[topic] = subscribedTypes;

                // we will need to register this on external bus
                subscriptionRequired = true;
            }           
        });
        
        if(subscriptionRequired)
            await this.messageBus.Subscribe(topic);
    }
    
    public async Task RemoveDynamicAsync(string topic, Type type)
    {
        var unsubscribeRequired = false;
    
        WriteLock(() => {
            if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
            {
                value.DynamicRegistrations.Remove(handlerType);             
                unsubscribeRequired = true;
            }
            else
            {
                throw new InvalidOperationException($"topic '{topic}' not registered.");
            }       
        });
        
        if(unsubscribeRequired)
            await this.messageBus.Unsubscribe(topic);
    }
    
    public IEnumerable<Type> GetTypesForTopic(string topic)
    {
        var found = ReadLock(() => {
            if(this.dict.TryGetValue(topic, var out SubscribedTypes value))
                return value.GetAll();  
            else
                return new List<Type>();
        });
    }

    private SubscribedTypes ReadLock(Func<SubscribedTypes> func)
    {
        rwLock.EnterReadLock();
        try { return func(); }
        finally { rwLock.ExitReadLock(); }
    }

    private void WriteLock(Action action)
    {
        rwLock.EnterWriteLock();
        try { action(); }
        finally { rwLock.ExitWriteLock(); }
    }               
}

标签: c#concurrencyasync-await

解决方案


您的第一种方法ConcurrentDictionary是有问题的,因为根据文档

如果你AddOrUpdate在不同的线程上同时调用,addValueFactory可能会被调用多次,但它的键/值对可能不会每次调用都添加到字典中。

[...]addValueFactoryupdateValueFactory委托在锁外被调用,以避免在锁下执行未知代码可能出现的问题。因此,AddOrUpdate对于ConcurrentDictionary<TKey,TValue>类上的所有其他操作,它不是原子的。

您使用 的方式ConcurrentDictionary表明您希望每次调用 提供的 lambdas 最多执行一次AddOrUpdate,但这不能保证。

您使用普通Dictionary+的第二种方法lock将无法编译,因为您不允许在 lock block 内等待。您必须要么移动到await messageBus.Subscribe(topic);受保护区域的外部,要么使用像SemaphoreSlim.


推荐阅读