首页 > 技术文章 > redission分布式锁实战

jerry0612 2021-03-10 12:20 原文

为什么要引入分布式锁?

分布式锁是用来解决分布式或集群场景中的并发冲突的一种常用手段。

分布式锁和传统jvm中的synchronized、ReentrantLock有什么区别?

分布式锁:解决分布式或集群场景下多个线程的并发竞争。(多进程多线程)

synchronized、ReentrantLock:只能解决单体应用中多个线程的并发竞争。(单进程多线程)

案例及问题分析

  • 创建springboot应用

1

点击next,进入下一步

2

继续点击next,选择springboot版本,然后finish创建完成。

  • pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.good.study</groupId>
    <artifactId>springboot-redis-lock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-redis-lock</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

目录结构如下:

3

resources目录下,创建application.yml文件

server:
  port: 8080
spring:
  redis:
    database: 0
    timeout: 6000
    # redis密码
    password: qwert321123
    lettuce:
      pool:
        max-active: 20
        max-idle: 10
        min-idle: 5
        max-wait: 1000
    # 集群环境配置    
    cluster:
      nodes:
        - 192.168.124.136:7001
        - 192.168.124.136:7002
        - 192.168.124.136:7003
        - 192.168.124.137:7004
        - 192.168.124.137:7005
        - 192.168.124.137:7006

创建一个IndexController,内容如下:

@RestController
public class IndexController {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @GetMapping("/test1")
    public void test1() {
        synchronized (this) {
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int realStock = stock -1;
     			// 提前将stock的值设置到redis,redis客户端执行 set stock 50
                redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }
    }
}

思考:上面代码是否会有问题?

如果应用只是部署在单机节点下,那么上面代码确实是没有什么问题的,但是如果是部署在多个机器节点下,那么问题就严重了。假设上面代码部署在多个机器节点下,会出现什么问题呢?我们先来看一张图:

5

从上图可以看到,应用部署在web1和web2,加入了nginx来做负载均衡,提供给前端、移动端的地址也将会是nginx的地址,假如有两个请求同时访问nignx,第一个请求,到达了web1,第二个次请求到达了web2,两次请求都是同一时刻到达的,通过上面代码,同时获取到的值都是50,那么就会出现库存超扣的情况。

废话不多说,来演示一下你就明白了。

首先配置nginx,配置如下:

#user  nobody;

worker_processes  1;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;
	client_max_body_size 1024M;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;

	upstream stocklock {
		server 127.0.0.1:8080 weight=1;
		server 127.0.0.1:9090 weight=1;
	}
	
    server {
        listen       80;
        server_name  127.0.0.1;
		
		proxy_set_header X-Forwarded-Host $host;
        proxy_set_header X-Forwarded-Server $host;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;
		location / {
			proxy_pass http://stocklock;
			proxy_connect_timeout 600;
			proxy_read_timeout 600;
		}
    }

}

启动应用

6

修改端口号为9090,再启动第二个应用,但是要先做一下配置,如下图所示:

7

点击Edit Configurations进入设置

8

勾选Allow parallel run,然后点击Apply,再次点击ok。

9

两个应用都启动完成,然后启动nginx,接下来,我们还需要借助一个工具:jmeter

10

点击jmeter.bat启动,关于jmeter使用,不做过多赘述。

创建一个线程组

11

这里设置的200,是指开启200个线程进行压测,设置的0,是指同一时刻进行访问。

添加一个HTTP request ,压测nginx的地址。

12

13

点击运行,观察java控制台。

14

再观察第二个应用的日志

15

两边日志对比,同时出现了49,47,46,44,43,这就是上面所说的出现库存超扣的问题。

代码如何改进?

代码又做了一下改进,具体代码如下:

@GetMapping("/test2")
public void test2() {
	synchronized (this) {
		String stockKey = "product_1001";
		try {
            // ①.不存在key则设置,存在直接返回
			Boolean result = redisTemplate.opsForValue().setIfAbsent(stockKey, "1");
			if (!result) {
				return;
			}
            // ②.设置过期时间
			redisTemplate.expire(stockKey, 10, TimeUnit.SECONDS);
			int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
			if (stock > 0) {
				int realStock = stock -1;
				redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
				System.out.println("扣减成功,剩余库存:" + realStock);
			} else {
				System.out.println("扣减失败,库存不足");
			}
		} finally {
            // ③.删除key(释放锁)
			redisTemplate.delete(stockKey);
		}
	}
}

代码看上去是没什么问题,但是实际坑还是有的,①和②的过程中,如果出现网络故障或者redis宕机了,那么第一个请求进来的获取到锁了,第二个请求进来时,发现这个key存在了,直接return了。③也是有问题的,如果第一个请求进来,代码还没执行完,第二个请求进来了,假如第二个请求的执行时间比第一个请求的执行时间快,第二个请求执行到了③,把第一个请求的key删掉了,就会导致锁失效。

又进一步做了改进,代码如下:

@GetMapping("/test3")
public void test3() {
	synchronized (this) {
		String stockKey = "product_1001";
		String requestId = UUID.randomUUID().toString();
		try {
			Boolean result = redisTemplate.opsForValue().setIfAbsent(stockKey, requestId, 10, TimeUnit.SECONDS);
			if (!result) {
				return;
			}
			
			int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
			if (stock > 0) {
				int realStock = stock -1;
				redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
				System.out.println("扣减成功,剩余库存:" + realStock);
			} else {
				System.out.println("扣减失败,库存不足");
			}
		} finally {
			if (requestId.equals(redisTemplate.opsForValue().get(stockKey))) {
				redisTemplate.delete(stockKey);
			}
		}
	}
}

其实做到这一步,已经算没什么问题了,但是对于大型互联网来说,还是会出现一些问题,具体是什么问题,大家可以自己思考一下,或者自己动手实践一下。

更进一步的解决方案

其实现在很多开源框架已经解决了上面的问题,这个开源框架就是redission,没听过redission的小伙伴可以去官网看官方文档:https://redisson.org/

  • pom文件添加redission依赖包
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.11.2</version>
        </dependency>
  • 配置RedissionProperties
@Component
@ConfigurationProperties(prefix = "spring.redis")
public class RedisConfigProperties {
    private String password;
    private cluster cluster;

    public static class cluster {
        private List<String> nodes;

        public List<String> getNodes() {
            return nodes;
        }

        public void setNodes(List<String> nodes) {
            this.nodes = nodes;
        }
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public RedisConfigProperties.cluster getCluster() {
        return cluster;
    }

    public void setCluster(RedisConfigProperties.cluster cluster) {
        this.cluster = cluster;
    }
}
  • 配置RedissionConfig
@Configuration
public class RedissonConfig {
    @Autowired
    private RedisConfigProperties redisConfigProperties;

    //添加redisson的bean
    @Bean
    public Redisson redisson() {
        //redisson版本是3.5,集群的ip前面要加上“redis://”,不然会报错,3.2版本可不加
        List<String> clusterNodes = new ArrayList<>();
        for (int i = 0; i < redisConfigProperties.getCluster().getNodes().size(); i++) {
            clusterNodes.add("redis://" + redisConfigProperties.getCluster().getNodes().get(i));
        }
        Config config = new Config();
        ClusterServersConfig clusterServersConfig = config.useClusterServers()
                .addNodeAddress(clusterNodes.toArray(new String[clusterNodes.size()]));
        clusterServersConfig.setPassword(redisConfigProperties.getPassword());//设置密码
        return (Redisson) Redisson.create(config);
    }
}
redission实现分布式锁原理

19

加锁机制

如果该客户端面对的是一个redis cluster集群,他首先会根据hash节点选择一台机器。

发送lua脚本到redis服务器上,脚本如下:

"if (redis.call('exists',KEYS[1])==0) then "+       --看有没有锁
	"redis.call('hset',KEYS[1],ARGV[2],1) ; "+       --无锁 加锁  
	"redis.call('pexpire',KEYS[1],ARGV[1]) ; "+      
	"return nil; end ;" +
"if (redis.call('hexists',KEYS[1],ARGV[2]) ==1 ) then "+  --我加的锁
		"redis.call('hincrby',KEYS[1],ARGV[2],1) ; "+  --重入锁
		"redis.call('pexpire',KEYS[1],ARGV[1]) ; "+  
	"return nil; end ;" +
"return redis.call('pttl',KEYS[1]) ;"  --不能加锁,返回锁的时间

lua的作用:保证这段复杂业务逻辑执行的原子性。
lua的解释:
KEYS[1]) : 加锁的key
ARGV[1] : key的生存时间,默认为30秒
ARGV[2] : 加锁的客户端ID (UUID.randomUUID()) + “:” + threadId)
第一段if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁key不存在的话,你就
进行加锁。如何加锁呢?很简单,用下面的命令:

hset myLock 8743c9c0-0795-4907-87fd-6c719a6b4586:1 1

上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”这个客户端对“myLock”这个锁key完成了加
锁。

接着会执行“pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒。

锁互斥机制

如果客户端2来尝试加锁,执行了同样的一段lua脚本,第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。

接着第二个if判断,判断一下,myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不
是的,因为那里包含的是客户端1的ID。

所以,客户端2会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时
间。比如还剩15000毫秒的生存时间。
此时客户端2会进入一个while循环,不停的尝试加锁。

自动延时机制

只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一
下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

可重入锁机制

第一个if判断肯定不成立,“exists myLock”会显示锁key已经存在了。
第二个if判断会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是
“8743c9c0-0795-4907-87fd-6c719a6b4586:1”

此时就会执行可重入加锁的逻辑,他会用:
incrby myLock
8743c9c0-0795-4907-87fd-6c71a6b4586:1 1
通过这个命令,对客户端1的加锁次数,累加1。

释放锁机制

执行lua脚本如下:

#如果key已经不存在,说明已经被解锁,直接发布(publish)redis消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
          "end;" +
# key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。 不是我加的锁 不能解锁
          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
          "end; " +
# 将value减1
          "local counter = redis.call('hincrby', KEYS[1], ARGV[3],
-1); " +
# 如果counter>0说明锁在重入,不能删除key
          "if (counter > 0) then " +
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
# 删除key并且publish 解锁消息
          "else " +
            "redis.call('del', KEYS[1]); " + #删除锁
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; "+
          "end; " +
          "return nil;",

– KEYS[1] :需要加锁的key,这里需要是字符串类型。
– KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:
“redisson_lockchannel{” + getName() + “}”
– ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合
redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。
– ARGV[2] :锁的超时时间,防止死锁
– ARGV[3] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + “:” + threadId
如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。
其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。
如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:
“del myLock”命令,从redis里删除这个key。
然后呢,另外的客户端2就可以尝试完成加锁了。

redission使用

具体代码如下:

@RestController
public class IndexController {

    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private Redisson redisson;

    @GetMapping("/test1")
    public void test1() {
        synchronized (this) {
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int realStock = stock -1;
                redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }
    }

    @GetMapping("/test2")
    public void test2() {
        synchronized (this) {
            String stockKey = "product_1001";
            String requestId = UUID.randomUUID().toString();
            try {
                Boolean result = redisTemplate.opsForValue().setIfAbsent(stockKey, requestId);
                if (!result) {
                    return;
                }
                redisTemplate.expire(stockKey, 10, TimeUnit.SECONDS);
                int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
                if (stock > 0) {
                    int realStock = stock -1;
                    redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                    System.out.println("扣减成功,剩余库存:" + realStock);
                } else {
                    System.out.println("扣减失败,库存不足");
                }
            } finally {
                redisTemplate.delete(stockKey);
            }
        }
    }

    @GetMapping("/test3")
    public void test3() {
        synchronized (this) {
            String stockKey = "product_1001";
            String requestId = UUID.randomUUID().toString();
            try {
                Boolean result = redisTemplate.opsForValue().setIfAbsent(stockKey, requestId, 10, TimeUnit.SECONDS);
                if (!result) {
                    return;
                }

                int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
                if (stock > 0) {
                    int realStock = stock -1;
                    redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                    System.out.println("扣减成功,剩余库存:" + realStock);
                } else {
                    System.out.println("扣减失败,库存不足");
                }
            } finally {
                if (requestId.equals(redisTemplate.opsForValue().get(stockKey))) {
                    redisTemplate.delete(stockKey);
                }
            }
        }
    }

    @GetMapping("/test4")
    public void test4() {
        String stockKey = "product_1001";
        RLock lock = redisson.getLock(stockKey);
        try {
            // 加锁
            lock.lock();
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int realStock = stock -1;
                redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
}

重新启动两个应用,再把redis里的stock恢复到50,进行测试。

注意:测试的时候把压测工具里的接口路径由test1改为test4

16

17

18

对比两边的日志,没有再出现上面的库存超扣问题了,好了,这就是redission实现的分布式锁,主要用来解决

分布式场景下数据并发竞争和库存超卖等应用场景。对redission分布式锁实现感兴趣的小伙伴,也可以自己看看底层加锁和释放锁的逻辑,以及锁续命的具体实现。

相关技术:https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers/#81-lock

推荐阅读