首页 > 技术文章 > 并发队列

Zzzzn 2020-03-23 16:11 原文

并发队列

  在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue。

  

一、ConcurrentLinkedQueue

  ConcurrentLinkedQueue:是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue,它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的,该队列不允许null元素;

  ConcurrentLinkedQueue重要方法:

    add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中这两个方法没有任务区别);

    poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会;

package com.zn.queueTest;

import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentLinkedQueueTest {
    public static void main(String[] args) {
        //准备队列
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        //存放数据
        queue.offer("张三");
        queue.offer("李四");
        queue.offer("王五");

        //获取队列中数据个数
        System.out.println("队列中当前有:"+queue.size()+"个数据~");
        //获取队列中头数据  poll()方法相当于消费了队列中的数据,队列数据会随之删除
        System.out.println("获取队列中的数据:"+queue.poll());
        System.out.println("队列中当前有:"+queue.size()+"个数据~");
        //获取队列中数据,但是不会删除
        System.out.println("获取队列中的数据:"+queue.peek());
        System.out.println("获取队列中的数据:"+queue.peek());
        System.out.println("队列中当前有:"+queue.size()+"个数据~");

    }
}

控制台效果:

  

二、BlockingQueue

  阻塞队列(BlockingQueue)是一个支持两个附加操作的队列,这两个附加的操作是:

    在队列为空时,获取元素的线程会等待队列变为非空;

    当队列满时,存储元素的线程会等待队列可用;

  阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器拿元素;

  在java中,BlockingQueue的接口位于java.util.concurrent包中,阻塞队列是线程安全的;

  在新增呢的concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题,通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利;

  常用的队列主要由以下两种:

    先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能,从某种程度上来说这种队列也体现了一种公平性;

    后进后出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件;

1.ArrayBlockingQueue

  ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组,有边界意思就是它的容量是有限的,我们必须在其初始化的时候执行它的容量大小,容量大小一旦执行就不可改变;

  ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移除的对象是头部;

package com.zn.queueTest;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> arrays=new ArrayBlockingQueue<String>(3);
        arrays.add("张三");
        arrays.add("李四");
        arrays.add("王五");

        //添加阻塞队列
        arrays.offer("赵六",1, TimeUnit.SECONDS);

        //poll方法相当于消费了队列中的数据,队列的数据就会删除
        System.out.println(arrays.poll());
        System.out.println(arrays.poll());
        System.out.println(arrays.poll());
        System.out.println(arrays.poll());
    }
}

控制台效果:

  

 如果先出队一条数据,此时被阻塞的数据就可以添加进来:

package com.zn.queueTest;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueueTest {
    //出队一条数据
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> arrays=new ArrayBlockingQueue<String>(3);
        arrays.add("张三");
        arrays.add("李四");
        arrays.add("王五");
        //出队一条数据
        System.out.println(arrays.poll());

        //添加阻塞队列
        arrays.offer("赵六",1, TimeUnit.SECONDS);

        //poll方法相当于消费了队列中的数据,队列的数据就会删除
        System.out.println(arrays.poll());
        System.out.println(arrays.poll());
        System.out.println(arrays.poll());
    }
}

控制台效果:

  

2.LinkedBlockingQueue

  LinkedBlockingQueue阻塞队列大小的配置时可选的,如果我们初始化时指定大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE容量,它的内部是一个链表;

  和ArrayBlockingQueue一样,LinkedBlockingQueue也是以先进先出的方式存储数据,最新插入的对象是尾部,最新移除的对象是头部;

package com.zn.queueTest;

import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue linkedBlockingQueue=new LinkedBlockingQueue(3);
        linkedBlockingQueue.add("A");
        linkedBlockingQueue.add("B");
        linkedBlockingQueue.add("C");
        System.out.println(linkedBlockingQueue.poll());
        System.out.println(linkedBlockingQueue.size());
    }
}

控制台效果:

  

3.PriorityBlockingQueue

  PriorityBlockingQueue是一个没有边界的队列,它的排序规则和java.util.PriorityQueue一样。需要注意,PriorityBlockingQueue中国允许插入null对象;

  所有插入PriorityBlockingQueue的对象必须实现java.lang.Comparable接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的;

  另外,我们可以从PriorityBlockingQueue获得一个迭代器Iterator,但这个迭代器并不保证按照优先级顺序进行迭代;

package com.zn.queueTest;

import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue<String> priorityBlockingQueue=new PriorityBlockingQueue<String>(3);
        priorityBlockingQueue.add("AA");
        priorityBlockingQueue.add("BB");
        priorityBlockingQueue.add("CC");
        System.out.println(priorityBlockingQueue.poll());
        System.out.println(priorityBlockingQueue.size());
    }
}

控制台效果:

   

4.SynchronousQueue

  SynchronousQueue队列内部仅容纳一个元素,当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费;

 

推荐阅读