首页 > 技术文章 > redis分布式锁

heapStark 2018-01-04 09:34 原文

Jedis总共支持八种调用方式:普通同步方式 ,事务方式(Transactions) ,管道(Pipelining) ,管道中调用事务 ,分布式直连同步调用 ,分布式直连异步调用 ,分布式连接池同步调用,分布式连接池异步调用

事务中某个操作失败,并不会回滚其他操作。

基于数据库的分布式锁:

 

Redis分布式锁:

直接基于Jedis的setnx和expire命令实现的分布式锁都会存在这种缺陷,比如删除key和重新获取key无法保证事务执行,以及未获取到锁的情况下更新到期时间。

Redis官方文档:https://redis.io/topics/distlock 采用Redlock算法实现分布式锁

Java语言对应的实现为:https://github.com/redisson/redisson

基本示例如下所示:

    @Test
    public void redissonTest() throws InterruptedException {
        RedisUtils redisUtils = RedisUtils.getInstance();
        //"192.168.107.53", 6379, 5000, "workorder"
        Config config = new Config();
        config.useSingleServer().setAddress("192.168.107.53:6379").setPassword("workorder");
        RedissonClient redissonClient = redisUtils.getRedisson(config);
        //创建可充入锁
        RLock rLock = redissonClient.getLock("hello");
        //获取锁
        rLock.tryLock(100,100, TimeUnit.SECONDS);
    }

tryLock方法实现如下:

    @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

基于zookeeper的分布式锁:

代码如下:

    @Test
    public void zkLockTest() throws Exception {
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/blog/test");
        lock.acquire(100, TimeUnit.SECONDS);
        assert (lock.acquire(10, TimeUnit.SECONDS));
        lock.release();
    }

 

推荐阅读