1.方法入口:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
执行tryAcquireShared方法获取资源,若获取成功则直接返回,若失败,
则进入等待队列,执行自旋获取资源,具体由doAcquireShared方法来实现。
2.过程
(1)tryAcquireShared(int):
tryAcquireShared(int)由继承AQS的自定义同步器来具体实现。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
其返回值为负值代表失败;0代表获取成功,但无剩余资源;正值代表获取成功且有剩余资源,其他线程可去获取。
(2)doAcquireShared(int)
private void doAcquireShared(int arg) {
// 将线程以共享模式添加到等待队列的尾部
final Node node = addWaiter(Node.SHARED);
// 初始化失败标志
boolean failed = true;
try {
// 初始化线程中断标志
boolean interrupted = false;
for (;;) {
// 获取当前节点的前继节点
final Node p = node.predecessor();
// 若前继节点为头结点,则执行tryAcquireShared获取资源
if (p == head) {
int r = tryAcquireShared(arg);
// 若获取资源成功,且有剩余资源,将自己设为头结点并唤醒后续的阻塞线程
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 如果中断标志位为真,则线程执行自我了断
if (interrupted)
selfInterrupt();
// 表征获取资源成功
failed = false;
return;
}
}
// houldParkAfterFailedAcquire(p, node)根据前继节点判断是否阻塞当前节点的线程
// parkAndCheckInterrupt()阻塞当前线程并检查线程是否被中断过,若被中断过,将interrupted置为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 放弃获取资源
cancelAcquire(node);
}
总结:可以发现,doAcquireShared与独占模式下的acquireQueued大同小异,主要有2点不同
第一点:doAcquireShared将线程的自我中断操作放在了方法体内部;
第二点:当线程获取到资源后,doAcquireShared会将当前线程所在的节点设为头结点,
若资源有剩余则唤醒后续节点,比acquireQueued多了个唤醒后续节点的操作。
上述方法体现了共享的本质,即当前线程吃饱了后,若资源有剩余,会招呼后面排队的来一起吃,好东西要大家一起分享嘛,哈哈。
3.具体的函数
下面具体看一下setHeadAndPropagate(Node, int)函数:
private void setHeadAndPropagate(Node node, int propagate) {
// 记录原来的头结点,下面过程会用到
Node h = head;
// 设置新的头结点
setHead(node);
// 如果资源还有剩余,则唤醒后继节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
可以看到,实际执行唤醒后继节点的方法是doReleaseShared(),继续追踪:
private void doReleaseShared() {
// 自旋操作
for (;;) {
// 获取等待队列的头结点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后继节点的线程
unparkSuccessor(h);
}else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head)
// loop if head changed
break;
}
}
4. 释放资源(共享模式)
首先进入到方法入口:
public final boolean releaseShared(int arg) {
// 尝试释放资源
if (tryReleaseShared(arg)) {
// 唤醒后继节点的线程
doReleaseShared();
return true;
}
return false;
}
同样的,tryReleaseShared(int)由继承AQS的自定义同步器来具体实现。
5.总结如何实现队列同步的
整个获取/释放资源的过程是通过传播完成的,如最开始有10个资源,线程A、B、C分别需要5、4、3个资源。
A线程获取到5个资源,其发现资源还剩余5个,则唤醒B线程;
B线程获取到4个资源,其发现资源还剩余1个,唤醒C线程;
C线程尝试取3个资源,但发现只有1个资源,继续阻塞;
A线程释放1个资源,其发现资源还剩余2个,故唤醒C线程;
C线程尝试取3个资源,但发现只有2个资源,继续阻塞;
B线程释放2个资源,其发现资源还剩余4个,唤醒C线程;
C线程获取3个资源,其发现资源还剩1个,继续唤醒后续等待的D线程;
......
6.回顾
如何进行资源的获取和释放(tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared),
需要注意的是,在acquire()和acquireShared()方法中,线程在阻塞过程中均是忽略中断的。
AQS也可以通过acquireInterruptibly()/acquireSharedInterruptibly()来支持线程在等待过程中响应中断。
学习来源:https://www.jianshu.com/p/0f876ead2846