首页 > 技术文章 > JAVA并发(7)-并发队列PriorityBlockingQueue的源码分析

ukyu 2021-06-11 00:08 原文

本文讲PriorityBlockingQueue(优先阻塞队列)

1. 介绍

一个无界的具有优先级的阻塞队列,使用跟PriorityQueue相同的顺序规则,默认顺序是自然顺序(从小到大)。若传入的对象,不支持比较将报错( ClassCastException)。不允许null
底层使用的是基于数组的平衡二叉树堆实现(它的优先级的实现)。
公共方法使用单锁ReetrantLock保证线程的安全性。

1.1 类结构

PriorityBlockingQueue类图

  • PriorityBlockingQueue类图

重要的参数


   // 数组的默认大小,会自动扩容的
   private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
    // 为啥是减8,一些虚拟机会在数组中保留一些header words(头字), 应该学到jvm时,就知道了
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    private transient Object[] queue;

    ...

    // 如果是空的话,优先级队列就使用元素的自然顺序(从小到大)
    private transient Comparator<? super E> comparator;

保证线程安全的措施


    /**
     * Lock used for all public operations
     */
    private final ReentrantLock lock;

    /**
     * Condition for blocking when empty
     */
    // 为啥没有notFull,因为该队列是无界的
    private final Condition notEmpty;

    /**
     * Spinlock for allocation, acquired via CAS.
     */
    // 为啥用CAS,而不是锁,来控制线程安全在扩容时,后面讲
    private transient volatile int allocationSpinLock;

2. 源码剖析

我们知道PriorityBlockingQueue实现了BlockingQueue,这篇博客有提到过BlockingQueue可以看一下,它定义了四种方式,对不能立即满足条件的不同的方法,有不同的处理方式。

我们一起去看看下面几种类型的方法的具体实现

  • 入队
  • 出队

2.1 入队

    public boolean add(E e) {
        return offer(e);
    }

    public void put(E e) {
        offer(e); // never need to block
    }

    // 忽略时间
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e); // never need to block
    }

上面几个入队方法都是去调用的offer(e),所以主要来看看这个方法的实现吧

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;

        // 直到扩容成功或溢出为止
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)

                // 二叉堆的插入算法,在后面讲
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

总体步骤很简单,查看是否需要扩容,然后再插入元素到二叉堆里。我们看看扩容的实现

  • 扩容

容量小于64,oldCap + (oldCap + 2); 否则oldCap + (oldCap * 0.5)

      private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;

        // allocationSpinLock默认是0,表示此时没有线程在扩容
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));

                // 检查是否溢出
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }

        // 此时,另一个线程正在扩容;让出自己的CPU时间片,下次再去抢占CPU时间片
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();

        // 重新获取锁
        lock.lock();

        // newArray已经被初始化了
        // 如果queue != array, queue已经被改变了;有两种可能:
        // 1. 已经有元素被出队了
        // 2. 已经有元素入队了,此时入队的线程肯定扩容成功了(在没有其他元素出队的情况下)
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

为什么扩容时,会解锁,并通过CAS去进行新容量的计算?

However, allocation during resizing uses a simple spinlock (used only while not holding main lock) in order to allow takes to operate concurrently with allocation.This avoids repeated postponement of waiting consumers and consequent element build-up.

上面的话,大致意思就是,扩容时使用自旋锁而不是lock,为了在扩容时,也可以执行出队操作(上面的代码中,扩容比较耗费时间)。避免让阻塞的消费者被反复阻塞(被唤醒后,不满足条件,又被阻塞,反复)。
Doug Lea

推荐阅读