AQS
概述
AQS全称是AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
特点:
-
用state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁,其它什么等待队列、条件队列都是AQS已经维护好的。
getState\setState
- 设置\获取 state 状态
compareAndSetState
- cas 机制设置 state 状态- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
-
提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
-
条件变量Condition来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
结构:
// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
private transient volatile Node head;
// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;
// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;
// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer
使用:
前面说了,AQS是用来帮我们快速构建同步器或者锁的,因此我们要做的就是在它已经定义好的结构之下对某些具体方法(state的获取释放)进行实现即可,注意,结构是不变的
子类主要实现这样一些方法
-
tryAcquire\tryRelease
尝试获取\释放锁 -
isHeldExclusively
如果同步是以独占方式进行的,则返回true
;其它情况则返回fals
Exclusive
独占
自定义同步器
final class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
if (acquires == 1){
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(int acquires) {
if(acquires == 1) {
if(getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
return false;
}
protected Condition newCondition() {
return new ConditionObject();
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
自定义锁
有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁
class MyLock implements Lock {
static MySync sync = new MySync();
@Override
// 尝试,不成功,进入等待队列
public void lock() {
sync.acquire(1);
}
@Override
// 尝试,不成功,进入等待队列,可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
// 尝试一次,不成功返回,不进入队列
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
// 尝试,不成功,进入等待队列,有时限
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
// 释放锁
public void unlock() {
sync.release(1);
}
@Override
// 生成条件变量
public Condition newCondition() {
return sync.newCondition();
}
}
详解
AQS的设计理念完全是围绕state状态、队列设计来的,其余的思想无非是对当前状态的修改、条件控制线程的阻塞与唤醒从而实现高效的锁控制,这就构成了一个同步器。因此,其核心设计逃不开这三板斧,即:
AQS采用了设计模式中的模板方式模式。暴露出tryAcquire
和tryRelease
等方法由子类来实现。即在其运行流程中有调用这些方法,但是这些方法其实还没实现,需要子类来重写实现。
注意,需要我们子类实现的是tryXxx
尝试锁的方法,因为acquire
和release
直接进行锁操作(加入队列、修改锁的持有线程)的方法是AQS已经实现并且已经定义为final
了,上源码
//加锁源码
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//释放锁源码
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//这里只贴出来独占锁的代码,至于能否重入的具体实现就体现在tryXxx()代码中了
//在尝试获取锁的时候,我们可以判断state状态是否为0,表示是否锁被占用,那么
//如果我们想实现可重入,思路就来了,再判断当前持有锁的线程是否为当前线程即可
//共享锁的话,由于锁对象可以被多个线程持有,思路不太一样
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
//也是需要我们实现tryAcquireShared
//跟独占锁相比,共享锁的主要特征在于当一个在等待队列中的共享节点成功获取到锁以后(它获取到的是共享锁)
//既然是共享,那它必须要依次唤醒后面所有可以跟它一起共享当前锁资源的节点,
//毫无疑问,这些节点必须也是在等待共享锁
//(这是大前提,如果等待的是独占锁,那前面已经有一个共享节点获取锁了,它肯定是获取不到的)。
由此引出了AQS的三大关键操作:
同步器的状态变更、线程阻塞和释放、插入和移出队列
它们对应着我们了解AQS必须掌握的三个基本组件:
- 同步器状态的原子性管理
- 线程阻塞与解除阻塞
- 队列的管理
同步器状态的原子性管理
AQS类使用 state 来保存同步状态,并暴露出getState
、setState
以及compareAndSetState
操作来读取和更新这个同步状态(即子类实现)。
/**
* The synchronization state.
*/
private volatile int state;
state 设计
- state 使用 volatile 配合 cas 保证其修改时的原子性
- state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想
我们使用AQS来创建自己的同步器、锁,无非就是实现对 state 的管理控制的具体方法,因此对 state 的运用非常重要,此外 state 还是我们实现可重入的依据,即可重入时 state 值是可以增加的,释放就减。
线程阻塞与解除阻塞
阻塞与恢复设计:
- 早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume那么 suspend 将感知不到
- 解决方法是使用 park & unpark 来实现线程的暂停和恢复
- 什么?你说为什么不用wait/notify?第一,它俩是要锁对象来调用的方法,并且唤醒还是随机唤醒的,怎么用?退一万步说,我都用上synchronized对象锁了,那还搞这些同步器干嘛?
- park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细,并且还可以通过 interrupt 打断,持有锁的线程释放锁后就可以使用 unpark 唤醒下一个抢夺锁的线程,新线程获取锁失败加入阻塞队列后也会调用 park 让自己休息。
所以接下来介绍一下park/unpark
暂停当前线程 | 恢复某个线程的运行 |
---|---|
LockSupport.park(); | LockSupport.unpark(暂停线程对象); |
- park 中的线程,处于 Waiting状态
- unpark 既可以在 park 之前调用也可以之后调用,都是用来恢复某个线程的运行,简单的说,调用 unpark 后再调用 park 线程依然不会暂停,类似提前“解毒”
- 听说park/unpark这样能解决线程顺序问题,例如wait/notify顺序颠倒就会出现某一个未被唤醒就一直等待,但是无论先还是后park我们都不会一直阻塞
队列的管理
Node节点:
既然都提到队列了,自然是离不开队列里的Node节点构造
static final class Node {
// 标识节点当前在共享模式下
static final Node SHARED = new Node();
// 标识节点当前在独占模式下
static final Node EXCLUSIVE = null;
// 下面的几个int常量是给waitStatus节点状态用的
// 代码此线程取消了争抢这个锁
static final int CANCELLED = 1;
// 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
static final int SIGNAL = -1;
// CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁
static final int CONDITION = -2;
// 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
static final int PROPAGATE = -3;
// 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
volatile int waitStatus;
// 前驱节点的引用
volatile Node prev;
// 后继节点的引用
volatile Node next;
// 这个就是线程本尊
volatile Thread thread;
}
Node 的数据结构其实就是 thread
waitStatus
pre
next
四个属性而已
同步(阻塞)队列:
整个框架的核心就是如何管理线程阻塞队列,该队列是严格的FIFO队列,因此不支持线程优先级的同步。同步队列的最佳选择是自身没有使用底层锁来构造的非阻塞数据结构,因此阻塞队列的移除与添加都使用了CAS。
队列中有 head 和 tail 两个指针节点,都用 volatile 修饰配合 cas 使用,每个节点有 waitState 维护节点状态,注意,虽然节点都是连在一起的,但是 head 节点是不算在阻塞队列里的,因为它已经持有资源了
前面我们提到了加锁的源代码
//加锁源码
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
逐步分析addWaiter
和acquireQueued
方法
addWaiter
将当前线程加入阻塞队列的源代码
private Node addWaiter(Node mode) {
//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//尝试快速方式直接放到队尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失败则通过enq入队。
enq(node);
return node;
}
enq
将节点加入阻塞队列队尾的源代码
private Node enq(final Node node) {
//这里就是经典空循环CAS自旋
for (;;) {
Node t = tail;//读取当前的尾部节点
if (t == null) { // 如果当前尾结点为空,就证明还没有初始化
//创建一个新空节点作为头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//尾结点不为空,将节点插入到尾部
node.prev = t;
//将当前节点作为尾结点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//注意这为什么没讲compareAndSetHead和compareAndSetTail方法
//因为这两个方法真就是见名知意,里面调用的unsafe.compareAndSwapObject方法
因为遵循FIFO规则,所以能成功获取到AQS同步状态的必定是首节点,首节点的线程在释放同步状态时,会唤醒后续节点,而后续节点会在获取AQS同步状态成功的时候将自己设置为首节点。
acquireQueued
在阻塞队列中等待获取资源的机会
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标记是否成功拿到资源
try {
boolean interrupted = false;//标记等待过程中是否被中断过
//又是一个“自旋”!
for (;;) {
final Node p = node.predecessor();//拿到前驱
//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
failed = false; // 成功获取资源
return interrupted;//返回等待过程中是否被中断过
}
//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
}
} finally {
if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
cancelAcquire(node);
}
}
所以我们可以知道acquire
加锁这一方法的整体流程:
- 调用我们子类实现的
tryAcquire
方法尝试CAS获取锁状态state,如果获取成功就直接返回,代表加锁成功 - 如果
tryAcquire
失败,那就addWaiter()
将线程添加到阻塞队列,并标记独占模式 acquireQueued
使节点在队列中休息park
,有机会就CAS获取资源(到自己会被前一个节点unpark
)- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断
selfInterrupt()
,将中断补上。
至于release
释放锁,步骤也类似,这里就不贴源码了,走一下流程:
- 调用我们子类实现的
tryRelease
来释放资源 - 成功就调用
unparkSuccessor
来唤醒等待队列的下一个线程
条件队列:
作为一个同步器,对线程的各种控制例如阻塞、唤醒等操作是避免不了的,那么我们该怎么实现这些功能呢?前面说到了park\unpark方法,但是仅仅依靠这两个方法来实现一个完善的同步器显然是不够的,因此AQS还存在一个内部类,即ConditionObject,该类对这两个方法进行了一次封装,形成await()和signal()方法,更加灵活,可以创建多个条件变量,每个条件变量维护一个条件队列,实现对线程的精确控制。
我们先来看看在ReentrantLock中怎么使用条件队列Condition的
public class ConditionDemo {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
/**
* 和Object对象的wait/notify只能操作一个monitor中的waitset不同
* condition可以实现有多少个condition就能创建多少个队列,所以其实condition也就是条件队列,用来进行线程间协调通信
*/
Condition hongCondition = lock.newCondition();
Condition lanCondition = lock.newCondition();
Condition heiCondition = lock.newCondition();
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 100, TimeUnit.MINUTES, new LinkedBlockingDeque<>(), new ThreadPoolExecutor.AbortPolicy());
executor.execute(()->{
System.out.println("这里是小红!");
lock.lock();
System.out.println("我要阻塞了,希望小黑唤醒我");
try {
hongCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我被唤醒了,我要唤醒小蓝");
lanCondition.signal();
lock.unlock();
});
executor.execute(()->{
System.out.println("这里是小蓝!");
lock.lock();
System.out.println("我要阻塞了,希望小红唤醒我");
try {
lanCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我被唤醒了,你们都醒了吗?");
lock.unlock();
});
executor.execute(()->{
System.out.println("这里是小黑!");
lock.lock();
System.out.println("小红醒一醒,我来唤醒你");
hongCondition.signal();
lock.unlock();
});
executor.shutdown();
}
}
从上面我们可以看出(看得出个der,举的什么例子,太烂了),相对于synchronized对象锁调用wait()方法将线程阻塞到同一个WaitSet,我们使用condition能够在同一个锁的情况下,将争夺锁的线程给阻塞到不同的队列并分别唤醒。
ConditionObject的await()方法源码
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//新建一个Node.CONDITION节点放到条件队列最后面
Node node = addConditionWaiter();
//释放当前线程获取的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//调用park()方法阻塞挂起当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
signal方法
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//条件队列移除第一个节点,然后把这个节点丢到阻塞队列中,然后激活这个线程
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
两种队列关系梳理
- 当多个线程调用lock.lock()方法的时候,只有一个线程获取到锁,其他的线程都会被转为Node节点丢到AQS的阻塞队列中,并做CAS自旋获取锁
- 当获取到锁的线程对应的条件变量的await()方法被调用的时候,该线程就会释放锁,并把当前线程转为Node节点放到条件变量对应的条件队列中
- 这个时候AQS的阻塞队列中又会有一个节点中的线程能得到锁了,如果这个线程又恰巧调用了对应条件变量的await()方法时,又会重复2的步骤,然后阻塞队列中又会有一个节点中的线程获得锁
- 然后,又有一个线程调用了条件变量的signal()或者signalAll()方法,就会把条件队列中一个或者所有的节点都移动到AQS阻塞队列中,然后调用unpark方法进行授权,就等着获得锁了
仔细想想,这不就是和对象锁的逻辑一样么?
一个锁对应一个阻塞队列,但是对应多个条件变量,每一个条件变量对应一个条件队列。其中,这两种队列中存放的都是Node节点,Node节点中封装了线程及其状态。