首页 > 技术文章 > Redis 十四 分布式锁

luyShare 2020-12-18 19:39 原文

1.什么是锁?

  多个线程同时想操作同一个资源(数据,对象)时,可能会并发访问的错误,例如都修改同一个数据,这个时候我们需要锁。

  锁就是当一个线程操作一个共享资源时,会独占这个资源,直到释放,其他线程才能操作这个资源,把并行变成了串行。

  单机单线程的情况下不需要锁,单机多线程,考虑到并发情况就需要锁了,通常都是锁住一个堆中的第三方对象,可以看这里,异步多线程安全

2.什么是分布式锁?

  只有在系统是分布式的情况下,才需要用到分布式锁,因为不同的服务器可能会同时处理同一个资源,这种时候单机锁只能锁住自己那台机器,无法避免其他机器同时访问共享资源,这个时候只能去锁一个外界第三方的服务。

  分布式锁就是一个第三方组件,用来处理分布式系统中,多个服务器并行的处理共享资源的情况,保证资源的正确性。

  锁和分布式锁可以同时使用的,这样保证单机只有一个线程会去访问第三方的服务,提高性能,减少IO。

  

 

 

3.如何使用分布式锁

3.1 封装分布式锁

 

    public class RedisLock
    {
        private ConnectionMultiplexer connectionMultiplexer = null;
        private IDatabase database = null;
        private string lockObj = string.Empty;
        public RedisLock()
        {
            connectionMultiplexer = ConnectionMultiplexer.Connect("127.0.0.1:6379");
            database = connectionMultiplexer.GetDatabase(0);
            lockObj = Environment.MachineName;
        }

        public bool AddLock(int productId)
        {
            bool flag = false;
            //如果加锁失败,锁被别人持有,则一直循环加锁
            while (true)
            {
                //redis加锁api
                //锁名
                //锁对象,谁持有这把锁,加锁和解锁的要是同一个线程,避免自己锁被别的线程解掉
                //超时时间,解决死锁问题,超时会自动释放锁,避免一直卡在加锁这一步
                string lockName = productId.ToString();
                flag = database.LockTake(lockName, lockObj, TimeSpan.FromSeconds(30));
                //加锁成功则跳出循环 
                if (flag)
                {
                    //开启新线程不断的设置时间,避免程序还没执行完就超时
                    Task task = Task.Run(() =>
                    {
                        while (true)
                        {
                            Thread.Sleep(5000);
                            string hadLockObj = database.StringGet(lockName).ToString();
                            if (hadLockObj.Equals(lockObj))//表明还是同一个锁未被释放,延长寿命
                            {
                                database.KeyExpire(lockName, TimeSpan.FromSeconds(30));
                            }
                            else
                            {
                                break;
                            }
                        }
                    });
                    break;
                }
                else
                {
                    //防止死循环给系统宕机
                    Thread.Sleep(2000);
                }
            }
            return flag;
        }

        public void DelLock(int productId)
        {
            //不停解锁直到成功
            while (true)
            {
                //redis解锁api
                //锁名
                //锁对象,谁持有这把锁,加锁和解锁的要是同一个线程
                string lockName = productId.ToString();
                bool flag = database.LockRelease(lockName, lockObj);
                if (flag)
                {
                    break;
                }
                else
                {
                    //防止死循环给系统宕机
                    Thread.Sleep(200);
                }
            }
            //关闭资源
            connectionMultiplexer.Dispose();
        }
    }

  这里可以看到,如果加锁失败,别人正持有锁,就会休眠200毫秒,然后再去轮询加锁,这种叫,自旋。

  还有一种方式,就是失败了以后利用订阅发布的功能,订阅一个通知,当其他线程释放锁后,发布通知,唤醒这个线程,再去获取锁,这种叫通知。

 自旋和通知,分布式下自旋方式的IO会比较大,单机下自旋性能较高,但通知也有一个问题,如果上一个拿到锁的线程它挂了,也就是没有释放这一步了,通知也就无法发布,那后面的也获取不到通知。

 

3.2 应用分布式锁

 

        protected void UseRedisLock(int productId)
        {
            RedisLock redisLock = new RedisLock();
            try
            {
                //加锁
                bool flag = redisLock.AddLock(productId);
                //加锁失败
                if(!flag)
                {
                   //失败操作
                }
                //相关业务
                string a = "aaa";
                int b = 101;
            }
            finally
            {
                Thread.Sleep(30000);
                //解锁
                redisLock.DelLock(productId);
            }
        }

 

 

4.红锁

只作用在一个Redis节点上,即使Redis通过哨兵机制保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

在Redis的master节点上拿到了锁;

但是这个加锁的key还没有同步到slave节点;

master故障,发生故障转移,slave节点升级为master节点;

导致锁丢失。

可以使用RedLock,有.net版本RedLock.Net,以后再补。

 https://www.dazhuanlan.com/2019/12/15/5df64c04a4154/

 https://www.cnblogs.com/fanqisoft/p/10942753.html

用Redis中的多个master实例,来获取锁,只有大多数实例获取到了锁,才算是获取成功。具体的红锁算法分为以下五步:

 

获取当前的时间(单位是毫秒)。
使用相同的key和随机值在N个节点上请求锁。这里获取锁的尝试时间要远远小于锁的超时时间,防止某个masterDown了,我们还在不断的获取锁,而被阻塞过长的时间。
只有在大多数节点上获取到了锁,而且总的获取时间小于锁的超时时间的情况下,认为锁获取成功了。
如果锁获取成功了,锁的超时时间就是最初的锁超时时间进去获取锁的总耗时时间。
如果锁获取失败了,不管是因为获取成功的节点的数目没有过半,还是因为获取锁的耗时超过了锁的释放时间,都会将已经设置了key的master上的key删除。

 

 

  

推荐阅读