首页 > 技术文章 > AQS(获取资源(共享模式) 5)--队列同步器

HuiShouGuoQu 2020-09-01 15:55 原文

1.方法入口:

   public final void acquireShared(int arg) {

            if (tryAcquireShared(arg) < 0)

                 doAcquireShared(arg);

            }

   执行tryAcquireShared方法获取资源,若获取成功则直接返回,若失败,

   则进入等待队列,执行自旋获取资源,具体由doAcquireShared方法来实现。

 

2.过程

   (1)tryAcquireShared(int):

            tryAcquireShared(int)由继承AQS的自定义同步器来具体实现。

            protected int tryAcquireShared(int arg) {

                  throw new UnsupportedOperationException();

            }

          其返回值为负值代表失败;0代表获取成功,但无剩余资源;正值代表获取成功且有剩余资源,其他线程可去获取。

    (2)doAcquireShared(int)

            private void doAcquireShared(int arg) {
            // 将线程以共享模式添加到等待队列的尾部
           final Node node = addWaiter(Node.SHARED);
            // 初始化失败标志
           boolean failed = true;
           try {
                 // 初始化线程中断标志
                boolean interrupted = false;
                for (;;) {
                    // 获取当前节点的前继节点
                   final Node p = node.predecessor();
                   // 若前继节点为头结点,则执行tryAcquireShared获取资源
                  if (p == head) {
                      int r = tryAcquireShared(arg);
                      // 若获取资源成功,且有剩余资源,将自己设为头结点并唤醒后续的阻塞线程
                      if (r >= 0) {
                           setHeadAndPropagate(node, r);
                           p.next = null; // help GC
                           // 如果中断标志位为真,则线程执行自我了断
                          if (interrupted)
                              selfInterrupt();
                           // 表征获取资源成功
                          failed = false;
                          return;
                       }
                  }
                 // houldParkAfterFailedAcquire(p, node)根据前继节点判断是否阻塞当前节点的线程
                // parkAndCheckInterrupt()阻塞当前线程并检查线程是否被中断过,若被中断过,将interrupted置为true
                if (shouldParkAfterFailedAcquire(p, node) &&
                     parkAndCheckInterrupt())
                     interrupted = true;
                }
           } finally {
               if (failed)
                  // 放弃获取资源
                  cancelAcquire(node);
           }

        总结:可以发现,doAcquireShared与独占模式下的acquireQueued大同小异,主要有2点不同

        第一点:doAcquireShared将线程的自我中断操作放在了方法体内部;

       第二点:当线程获取到资源后,doAcquireShared会将当前线程所在的节点设为头结点,

                     若资源有剩余则唤醒后续节点,比acquireQueued多了个唤醒后续节点的操作。

        上述方法体现了共享的本质,即当前线程吃饱了后,若资源有剩余,会招呼后面排队的来一起吃,好东西要大家一起分享嘛,哈哈。

 

3.具体的函数

   下面具体看一下setHeadAndPropagate(Node, int)函数:

     private void setHeadAndPropagate(Node node, int propagate) {
           // 记录原来的头结点,下面过程会用到
           Node h = head;
           // 设置新的头结点
           setHead(node);
    
           // 如果资源还有剩余,则唤醒后继节点
          if (propagate > 0 || h == null || h.waitStatus < 0 ||
              (h = head) == null || h.waitStatus < 0) {
                    Node s = node.next;
                     if (s == null || s.isShared())
                            doReleaseShared();
              }
      }    

      可以看到,实际执行唤醒后继节点的方法是doReleaseShared(),继续追踪:

       private void doReleaseShared() {
           // 自旋操作
           for (;;) {
           // 获取等待队列的头结点
           Node h = head;
           if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;
                            // 唤醒后继节点的线程
                            unparkSuccessor(h);
                      }else if (ws == 0 &&  !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                              continue;       // loop on failed CAS       

                       }
                     if (h == head)                  

                        // loop if head changed
                        break;
                }
          }

 

4. 释放资源(共享模式)

    首先进入到方法入口:

     public final boolean releaseShared(int arg) {
            // 尝试释放资源
            if (tryReleaseShared(arg)) {
                 // 唤醒后继节点的线程
                 doReleaseShared();
                 return true;
             }
          return false;
      }

     同样的,tryReleaseShared(int)由继承AQS的自定义同步器来具体实现。

 

5.总结如何实现队列同步的

   整个获取/释放资源的过程是通过传播完成的,如最开始有10个资源,线程A、B、C分别需要5、4、3个资源。

    A线程获取到5个资源,其发现资源还剩余5个,则唤醒B线程;

    B线程获取到4个资源,其发现资源还剩余1个,唤醒C线程;

    C线程尝试取3个资源,但发现只有1个资源,继续阻塞;

    A线程释放1个资源,其发现资源还剩余2个,故唤醒C线程;

    C线程尝试取3个资源,但发现只有2个资源,继续阻塞;

    B线程释放2个资源,其发现资源还剩余4个,唤醒C线程;

    C线程获取3个资源,其发现资源还剩1个,继续唤醒后续等待的D线程;

    ......

 

6.回顾

   如何进行资源的获取和释放(tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared),

   需要注意的是,在acquire()和acquireShared()方法中,线程在阻塞过程中均是忽略中断的。

   AQS也可以通过acquireInterruptibly()/acquireSharedInterruptibly()来支持线程在等待过程中响应中断。

 

学习来源:https://www.jianshu.com/p/0f876ead2846

 

  

  

推荐阅读