首页 > 技术文章 > AQS源码分析

mobiwusihuan288 2020-10-28 11:41 原文

学习地址:https://www.bilibili.com/video/BV1Hy4y1B78T?p=7

可重入锁(又名为递归锁)

是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。

Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。

可:可以。

重:再次。

入:进入

​ 进入同步域(即同步代码块/方法或显式锁锁定的代码)

锁:同步锁

一个线程中的多个流程可以获取同一把锁,持有这把同步锁可以再次进入。

自己可以获取自己的内部锁

可重入锁种类

隐式锁(即synchronized关键字使用的锁)默认是可重入锁

// * 在一个synchronized修饰的方法或代码块的内部
// * 调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的

同步块

public class ReEnterLockDemo {

    static Object objectLockA = new Object();

    public static void m1() {
        new Thread(() -> {
            synchronized (objectLockA) {
                System.out.println(Thread.currentThread().getName() + "\t" + "------外层调用");
                synchronized (objectLockA) {
                    System.out.println(Thread.currentThread().getName() + "\t" + "------中层调用");
                    synchronized (objectLockA) {
                        System.out.println(Thread.currentThread().getName() + "\t" + "------内层调用");
                    }
                }
            }
        }, "t1").start();

    }

    public static void main(String[] args) {
        m1();
    }
}

同步方法

public class ReEnterLockDemo2 {

    public synchronized void m1() {
        System.out.println("=====外层");
        m2();
    }

    public synchronized void m2() {
        System.out.println("=====中层");
        m3();
    }

    public synchronized void m3() {
        System.out.println("=====内层");
    }


    public static void main(String[] args) {
        new ReEnterLockDemo().m1();
    }
}

每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。

当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。

在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么Java虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。

当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。计数器为零代表锁已被释放。

显式锁(即Lock)也有ReentrantLock这样的可重入锁。

public class ReEnterLockDemo3 {

    static Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        new Thread(() -> {
            lock.lock();
            //lock.lock();
            try {
                System.out.println("=======外层");
                lock.lock();
                try {
                    System.out.println("=======内层");
                } finally {
                    lock.unlock();
                }
            } finally {
                //实现加锁次数和释放次数不一样
                //由于加锁次数和释放次数不一样,第二个线程始终无法获取到锁,导致一直在等待。
                lock.unlock();
                //lock.unlock();    //正在情况,加锁几次就要解锁几次
            }
        }, "t1").start();

        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("b thread----外层调用lock");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "b").start();
        
    }
}

线程等待唤醒机制

方式1: notify()&&wait()

使用Object中的wait()方法让线程等待, 使用Object中的notify()方法唤醒线程

/**
 * 要求: t1线程等待3秒钟,3秒钟后t2线程唤醒t1线程继续工作
 * 以下异常情况:
 * 2 wait方法和notify方法,两个都去掉同步代码块后看运行效果
 * 2.1 异常惰况
 * Exception in thread "t1" java.Lang.ILlegalLNonitorStateException at java.lang.Object.wait(Native Method)
 * Exception in thread "t2" java.lang.ILlegalWonitorStateException at java.lang.Object.notify(Native Method)
 * <p>
 * 2.2 结论
 * Object类中的wait、notify、notifyALlL用于线程等待和唤醒的方法,都必须在synchronized内部执行(必须用到关键字synchronized)
 * <p>
 * 3 将notify放在wait方法前先执行,t1先notify 了,3秒钟后t2线程再执行wait方法
 * 3.1程序一直无法结柬
 * 3.2结论
 * 先wait后notify、notifyall方法,等待中的线程才会被唤醒,否则无法唤醒
 */
public class LockSupportDemo {

    static Object objectLock = new Object();

    public static void main(String[] args) {
        new Thread(() -> {
            /*try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/

            synchronized (objectLock) {
                System.out.println(Thread.currentThread().getName() + "\t" + "------come in");
                try {
                    objectLock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "\t" + "------被唤醒");
            }
        }, "A").start();

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> {
            synchronized (objectLock) {
                objectLock.notify();
                System.out.println(Thread.currentThread().getName() + "\t" + "------通知");
            }
        }, "B").start();

    }
}

wait方法和notify方法,两个都去掉同步代码块

image-20201027161503594


Exception in thread "A" Exception in thread "B" java.lang.IllegalMonitorStateException


将notify放在wait方法前面

image-20201027162438661


程序无法执行,无法唤醒

小结:

wait和notify方法必须要在同步块或者方法里面且成对出现使用

先wait后notify才OK

方式2: signal()&&await()

使用JUC包中Condition的await()方法让线程等待,使用signal()方法唤醒线程

public class LockSupportDemo2 {
    static Lock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();

    public static void main(String[] args) {

        new Thread(() -> {
            /*try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + "\t" + "------come in");
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "\t" + "------被唤醒");
            } finally {
                lock.unlock();
            }
        }, "A").start();

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> {
            lock.lock();
            try {
                condition.signal();
                System.out.println(Thread.currentThread().getName() + "\t" + "------通知");
            } finally {
                lock.unlock();
            }
        }, "B").start();

    }
}

去掉lock.lock()和lock.unlock()

image-20201027163423589


Exception in thread "A" java.lang.IllegalMonitorStateException

Exception in thread "B" java.lang.IllegalMonitorStateException


将signal放在await前

image-20201027163747444

程序无法执行,无法唤醒

小结:

signal和await方法必须要在Lock锁块中

先signal后await才OK

传统的synchronized和Lock实现等待唤醒通知的约束

  • 线程先要获得并持有锁,必须在锁块(synchronized或lock)中
  • 必须要先等待后唤醒,线程才能够被唤醒

方式3: LockSupport

LockSupport类可以阻塞当前线程以及唤醒指定被阻塞的线程


image-20201027160834295


LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。

LockSupport中的park()和unpark()的作用分别是阻塞线程和解除阻塞线程


image-20201027164504205


LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可(permit),permit只有两个值1和零,默认是零。

可以把许可看成是一种(0,1)信号量(Semaphore),但与Semaphore不同的是,许可的累加上限是1。

主要方法


image-20201027164751508


阻塞park()/park(Object blocker)

阻塞当前线程/阻塞传入的具体线程

image-20201027164857247


permit默认是0,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设置为1时,park方法会被唤醒,然后会将permit再次设置为0并返回。

唤醒unpark(Thread thread)

唤醒处于阻断状态的指定线程

image-20201027165027151


调用unpark(thread)方法后,就会将thread线程的许可permit设置成1(注意多次调用unpark方法,不会累加,permit值还是1)会自动唤醒thread线程,即之前阻塞中的LockSupport.park()方法会立即返回。

/**
 * LockSupport:俗称 锁中断
 * 以前的两种方式:
 * 1.以前的等待唤醒通知机制必须synchronized里面有一个wait和notify
 * 2.lock里面有await和signal
 * 这上面这两个都必须要持有锁才能干,
 * LockSupport它的解决的痛点
 * 1.LockSupport不用持有锁块,不用加锁,程序性能好,
 * 2.先后顺序,不容易导致卡死
 */
public class LockSupportDemo3 {

    public static void main(String[] args) {

        Thread t1 = new Thread(() -> {
            /*try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            }*/
            System.out.println(Thread.currentThread().getName() + "\t" + "------come in");
            System.out.println(Thread.currentThread().getName() + "\t" + "------come in" + System.currentTimeMillis());
            LockSupport.park();//阻塞当前线程
//            LockSupport.park();//阻塞当前线程
            System.out.println(Thread.currentThread().getName() + "\t" + "------被唤醒");
            System.out.println(Thread.currentThread().getName() + "\t" + "------被唤醒" + System.currentTimeMillis());
        }, "t1");
        t1.start();

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (Exception e) {
            e.printStackTrace();
        }

        Thread t2 = new Thread(() -> {
            LockSupport.unpark(t1);
//            LockSupport.unpark(t1);
            System.out.println(Thread.currentThread().getName() + "\t" + "------通知");
        }, "t2");
        t2.start();
        
    }
}

不需要锁块

先执行unpark()再park()

image-20201027165554948


没有卡死

sleep方法3秒后醒来,执行park()无效,没有阻塞:先执行unpark(t1)导致上面的park()方法形同虚设无效,时间一致


小结

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语

LockSupport是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,阻塞之后也有对应的唤醒方法。归根结底,LockSupport调用的Unsafe中的native代码。

image-20201027170036196


LockSupport提供park()和unpark()方法实现阻塞线程和解除线程阻塞的过程

LockSupport和每个使用它的线程都有一个许可(permit)关联。permit相当于1,0的开关,默认是0,调用一次unpark就加1变成1,调用一次park会消费permit,也就是将1变成0,同时park立即返回。

如再次调用park会变成阻塞(因为permit为零了会阻塞在这里,一直到permit变为1),这时调用unpark会把permit置为1。

每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累凭证。

形象的理解

线程阻塞需要消耗凭证(permit),这个凭证最多只有1个。

当调用park方法时
如果有凭证,则会直接消耗掉这个凭证然后正常退出;
如果无凭证,就必须阻塞等待凭证可用;

而unpark则相反,它会增加一个凭证,但凭证最多只能有1个,累加无效。

面试题
为什么可以先唤醒线程后阻塞线程?

因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞。

为什么唤醒两次后阻塞两次,但最终结果还会阻塞线程?

因为凭证的数量最多为1,连续调用两次unpark和调用一次unpark效果一样,只会增加一个凭证;
而调用两次park却需要消费两个凭证,证不够,不能放行。


image-20201027170622164


AQS

AbstractQueuedSynchronizer抽象的队列同步器

前置知识

公平锁和非公平锁

可重入锁

LockSupport

自旋锁

数据结构之链表

设计模式之模板设计模式

image-20201027171045002


是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石, 通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量,表示持有锁的状态。

image-20201027171149596


AQS是JUC内容中最重要的基石

image-20201027171312836


ReentrantLock

image-20201027171417681


CountDownLatch

image-20201027171608185


ReentrantReadWriteLock

image-20201027171718707


Semaphore

image-20201027171747937


进一步理解锁和同步器的关系

锁,面向锁的使用者

​ 定义了程序员和锁交互的使用层API,隐藏了实现细节,你调用即可。

同步器,面向锁的实现者

​ 比如Java并发大神Douglee,提出统一规 范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。


有阻塞就需要排队,实现排队必然需要有某种形式的队列来进行管理

抢到资源的线程直接使用办理业务,抢占不到资源的线程的必然涉及一种排队等候机制,抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。

既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node) ,通过CAS、自旋以及LockSuport.park()的方式,维护state变量的状态,使并发达到同步的效果。


image-20201027173057023


AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的 FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成 一个Node节点来实现锁的分配,通过CAS完成对State值的修改。


AQS内部体系架构

image-20201027173644432


image-20201027173910775


AQS自身

AQS的int变量

​ AQS的同步状态State成员变量

image-20201027174005616

类比:银行办理业务的受理窗口状态

​ 零就是没人,自由状态可以办理

​ 大于等于1,有人占用窗口,等着去


AQS的CLH队列

​ CLH队列(三个大牛的名字组成),为一个双向队列

image-20201027174144253


类比:银行侯客区的等待顾客

小总结

有阻塞就需要排队,实现排队必然需要队列

state变量+CLH双端Node队列


内部类Node

Node类在AQS类内部)

Node的int变量

Node的等待状态waitState成员变量

image-20201028081555455


等候区其它顾客(其它线程)的等待状态,队列中每个排队的个体就是一个Node.

Node对象

image-20201028082212793


属性说明

image-20201028082334125

AQS同步队列的基本结构

image-20201028083607380


是用LockSupport.pork()来进行排队的

源码解读AQS

从ReentrantLock开始解读AQS

Lock接口的实现类,基本都是通过【聚合】了一个【队列同步器】的子类完成线程访问控制的。

通过ReentrantLock的源码来讲解公平锁和非公平锁

image-20201028084042096


image-20201028084201226


image-20201028084633221


image-20201028084732606


可以明显看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()

hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效节点的方法

image-20201028085330571


对比公平锁和非公平锁的tryAcqure()方法的实现代码, 其实差别就在于非公平锁获取锁时比公平锁中少了一个判断!hasQueuedPredecessors()

hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下:

公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;

非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程在unpark(), 之后还是需要竞争锁(存在线程竞争的情况下)

image-20201028093953895


image-20201028094020856


非公平锁,方法lock()

public class AQSDemo {
    public static void main(String[] args) {

        ReentrantLock lock = new ReentrantLock();

        //带入一个银行办理业务的案例来模拟我们的AQS如何进行线程的管理和通知唤醒机制

        //3个线程模拟3个来银行网点,受理窗口办理业务的顾客

        //A顾客就是第一个顾客,此时受理窗口没有任何人,A可以直接去办理
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("-----A thread come in");

                try {
                    TimeUnit.MINUTES.sleep(20);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } finally {
                lock.unlock();
            }
        }, "A").start();

        //第二个顾客,第二个线程---》由于受理业务的窗口只有一个(只能一个线程持有锁),此时B只能等待,
        //进入候客区
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("-----B thread come in");
            } finally {
                lock.unlock();
            }
        }, "B").start();

        //第三个顾客,第三个线程---》由于受理业务的窗口只有一个(只能一个线程持有锁),此时C只能等待,
        //进入候客区
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("-----C thread come in");
            } finally {
                lock.unlock();
            }
        }, "C").start();
    }
}

初始状态

img


lock()

image-20201028091602445


第一个线程通过compareAndSetState判断

image-20201028100530696


并设置A当前线程

image-20201028100702174


img


线程B进入acquire(1)

image-20201028101017862


acquire()3个方法

tryAcquire()

image-20201028101230800


典型的模板模式

image-20201028101441135


image-20201028101516792


currnet 为线程B

c 当前状态标志位

currnetc

c==0 刚好此时线程A执行完成,线程B抢占成功

若当前线程 == 正在执行的线程

nextc + 1 ==》 可重入锁

由于线程A的存在,返回false

tryAcquire(arg)的值为false


addWaiter()

image-20201028102013031


static final Node EXCLUSIVE = null;

进入enq(node);

image-20201028103359147


第一次循环

new Node() 添加到 头节点的位置

双向链表中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。 真正的第一个有数据的节点,是从第二个节点开始的。

img


第二次循环

将B添加到哨兵节点后面

image-20201028104304845


线程C进入

img


acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

image-20201028105525953


第一次 p 哨兵节点

tryAcquire(arg) 再抢抢

image-20201028110012891


如果前驱节点的waitstatus是SIGNAL状态(-1),即shouldParkAfterFailedAcquire方法会返回true 程序会继续向下执行parkAndCheckInterrupt方法,用于将当前线程挂起

第二次

image-20201028110110770


parkAndCheckInterrupt()

image-20201028110202193


B 节点被阻塞,正在排队等待中

C 节点同理

非公平锁,方法unlock()

image-20201028110804312


image-20201028110817327


image-20201028110850406


image-20201028110924329


模板模式

image-20201028111000643


img


A 执行完成 B 被释放

LockSupport.unpark(s.thread);

image-20201028111321967


B 被唤醒

image-20201028112109207


image-20201028112115145


img


总结

我相信你应该看过源码了,那么AQS里面有个变量叫State,它的值有几种?

3个状态:没占用是0,占用了是1,大于1是可重入锁

如果AB两个线程进来了以后,请问这个总共有多少个Node节点?

答案是3个

推荐阅读