首页 > 技术文章 > CountDownLatch 源码解析—— countDown()

cuglkb 2018-04-01 13:56 原文

上一篇文章从源码层面说了一下CountDownLatch 中 await() 的原理。这篇文章说一下countDown() 。

public void countDown() { //CountDownLatch
    sync.releaseShared(1);
}
    ↓
public final boolean releaseShared(int arg) { //AQS
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
    ↓
protected boolean tryReleaseShared(int releases) { //CountDownLatch.Sync 
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
    

通过构造器 CountDownLatch end = new CountDownLatch(2);  state 被设置为2,所以c == 2,nextc = 2-1, 

然后通过下面这个CAS操作将state设置为1。

  protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

此时nextc还不为0,返回false。一直等到countDown()  方法被调用两次,state == 0,nextc ==0,此时返回true。

进入doReleaseShared()方法。

doReleaseShared();
    ↓
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

回顾一下此时的等待队列模型。

       +--------------------------+   prev           +------------------+
head   | waitStatus = Node.SIGNAL | <---- node(tail) | currentThread    |
       +--------------------------+                  +------------------+

此时head 不为null,也不为tail,waitStatus == Node.SIGNAL,所以进入 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 这个判断。

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    ↓
 /**
 * CAS waitStatus field of a node.
 */
private static final boolean compareAndSetWaitStatus(Node node,
                                                     int expect,
                                                     int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                    expect, update);
}

这个CAS 操作将 state 设置为 0 ,也就是说此时Head 中的 waitStatus 是0.此时队列模型如下所示

       +----------------+   prev           +------------------+
head   | waitStatus = 0 | <---- node(tail) | currentThread    |
       +----------------+                  +------------------+

该方法返回true。进入unparkSuccessor(h);

unparkSuccessor(h);
    ↓
private void unparkSuccessor(Node node) {
   /*
    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling.  It is OK if this
    * fails or if status is changed by waiting thread.
    */
   int ws = node.waitStatus;
   if (ws < 0)
       compareAndSetWaitStatus(node, ws, 0);

   /*
    * Thread to unpark is held in successor, which is normally
    * just the next node.  But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
   Node s = node.next;
   if (s == null || s.waitStatus > 0) {
       s = null;
       for (Node t = tail; t != null && t != node; t = t.prev)
           if (t.waitStatus <= 0)
               s = t;
   }
   if (s != null)
       LockSupport.unpark(s.thread);
}

s 就是head的后继结点,也就是装有当前线程的结点。s != null ,并且s.waitStatus ==0 ,所以进入 LockSupport.unpark(s.thread);

 public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

也就是unlock 被阻塞的线程。裁判被允许吹哨了!

countDown() 的原理就此就非常清晰了,

每执行一次countDown() 方法,state 就是减1,直到state == 0,则开始释放被阻塞在队列中的线程,根据前驱结点中waitStatus的状态,释放后续结点中的线程。

OK,回到上一篇文章的问题,什么时候跳出下面这个循环(await方法中的循环)

for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
            setHeadAndPropagate(node, r);
            p.next = null; // help GC
            failed = false;
            return;
        }
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        throw new InterruptedException();
}

此时state == 0,所以进入 setHeadAndPropagate 方法。

setHeadAndPropagate(node, r);
    ↓
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
    ↓
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

这个方法将head 的后继结点变为head。该方法过后,又将node的next结点设置为null,模型变成下图

       prev                +---------+  next
null <---- node(tail/head) | null    | ----> null
                           +---------+

也就是node head tail 什么的都被置为null,等待GC回收了,这个时候return,跳出了for循环,队列被清空。

下面演示一下整个过程

setHeadAndPropagate(node, r);

           +----------------+  
head(tail) |  waitStatus=0  |
           |  thread =null  |
           +----------------++----------------+            +----------------+
           |   waitStatus=0 |   prev     | waitStatus=0   |
head(tail) |   thread =null | <---- node | currentThread  |
           +----------------+            +----------------++----------------+                  +----------------+
     |   waitStatus=0 |   prev           | waitStatus=0   |
head |   thread =null | <---- node(tail) | currentThread  |
     +----------------+                  +----------------++----------------+                  +----------------+
     |   Node.SIGNAL  |   prev           | waitStatus=0   |
head |   thread =null | <---- node(tail) | currentThread  |
     +----------------+                  +----------------++----------------+                  +----------------+
     |   waitStatus=0 |   prev           | waitStatus=0   |
head |   thread =null | <---- node(tail) | currentThread  |
     +----------------+                  +----------------++----------------+
        prev                | waitStatus=0   | next
null <---- node(tail/head)  | null           | ----> null
                            +----------------+
                            

 

CountDownLatch 的核心就是一个阻塞线程队列,这是由链表构造而成的队列,里面包含thread 和 waitStatus,其中waitStatus说明了后继结点线程状态。

state 是一个非常重要的标志,构造时,设置为对应的n值,如果n != 0,阻塞队列将一直阻塞,除非中断线程。

每次调用countDown()  方法,就是将state-1,而调用await() 方法就是将调用该方法的线程加入到阻塞队列,直到state==0,才能释放线程。

 

推荐阅读