首页 > 技术文章 > JAVA 1.5 并发之 BlockingQueue

rockman12352 2014-06-15 00:27 原文

1.BlockingQueue 顾名思义就是阻塞队列

最经典的使用场合就是 生产者 - 消费者 模型啦,其优点是队列控制已经处理好,用户只需要存(满了会阻塞),取(空了会阻塞)

可以更多的关心核心逻辑,而不是并发控制

PS: BlockingDeque是阻塞双向队列,这里不作讲解,其实意思差不多啦

 

add(E e)       添加元素,如果失败则抛出exception

put(E e)        添加元素,如果队列满,则阻塞

offer(E e)      添加元素,如果队列满则返回false

offer(E e, long timeout, TimeUnit unit) 同上,但是有timeout

 

 

take(E e)      取出元素,如果队列空,则阻塞

poll( long timeout, TimeUnit unit) 在一定时间内取出元素,如无则返回null

drainTo()     一次性取出所有元素

 

2.各种阻塞队列

1) ArrayBlockingQueue 

以数组为基础的queue, 建立时需要定义大小,同时可以定义公平锁 or not.

有一个特点是 元素在取出或者插入时使用同一个锁,或许是对性能影响不大?

 

2) LinkedBlockingQueue

以链表为基础的queue,建立时是不定义大小的,所以要注意生产太多内存会耗尽

与数组队列不同的是该队列在取出与写入是有分开的锁,理论上性能更好

但是具体还是得分情况,最好都试试,据说VM对array的优化很好,可能有惊喜

 

3) PriorityBlockingQueue

带有优先级的队列,同样不限定大小,只有在队列空的时候进行阻塞

定义的时候需要给与一个comparator

 

4)DelayQueue

延迟队列,具体使用情况,例如长时间无响应等

其实就是一个特别版的优先队列,优先度就是延迟,在获取元素的时候会根据延迟的大小决定是否返回元素

 

5)SynchronousQueue

同步队列,我的理解就是一个put对应一个take, 只有空的时候可以放东西,只能放一个

暂时没想到什么使用场合

 

3. 内部实现

具体细节我就不讲了,大家可以自己看,最主要就是在存取的时候用了之前说的lock来进行并发控制

而timeout则是由lock里condition的await实现,总体都不难,很容易看懂的

 

EXAMPLE

static ArrayBlockingQueue<Integer> arrayQueue;
    static long startTime = System.currentTimeMillis();

    public static void main(String[] args) {
        arrayQueue = new ArrayBlockingQueue<Integer>(10);


        ExecutorService executor = Executors.newFixedThreadPool(4);

        //丢东西进去
        Runnable producer = new Runnable() {
            @Override
            public void run() {

                for (int i = 0; i < 5; i++) {
                    try {

                        int randomInt = i;
                        arrayQueue.put(randomInt);


                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        //拿东西出来
        Runnable consumer = new Runnable() {
            @Override
            public void run() {
                while (true)
                {
                    try {

                        System.out.println("remain capacity = "+arrayQueue.remainingCapacity());
                        Integer result = arrayQueue.poll(1,TimeUnit.SECONDS);
                        Thread.sleep(50);
                        if (result==null){
                            return;
                        }

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        //因为我的线程池是4,所以放4个任务进去
        executor.submit(consumer);
        executor.submit(producer);
        executor.submit(producer);
        executor.submit(producer);

        executor.shutdown();



    }

  OUTPUT:

remain capacity = 10
remain capacity = 0
remain capacity = 0
remain capacity = 0
remain capacity = 0
remain capacity = 0
remain capacity = 1
remain capacity = 2
remain capacity = 3
remain capacity = 4
remain capacity = 5
remain capacity = 6
remain capacity = 7
remain capacity = 8
remain capacity = 9
remain capacity = 10

 

 

 

推荐阅读