首页 > 解决方案 > 从队列中取出的正确方法?

问题描述

下面的代码随机冻结。

队列在开始时是预先填充的,并且仅在线程开始从中获取项目之后才被获取。

我想我没有正确使用队列。尽管进行了isEmpty()检查,但当一个线程尝试获取一项时,队列可能为空,使其无限期等待。

    @Override
    public void run() {
        long milisecs;
        try {
            while ( ! queue.isEmpty()) {   // !!!
                milisecs  = queue.take();  // !!!
                worker(milisecs);
            }
        } catch (InterruptedException ex) {}
    }

例如,如果发生这种情况,它会挂起:

  1. threadA 检查是否queue.isEmpty(),得到一个 false 并尝试继续。
  2. threadBtake()队列中的最后一项
  3. threadA 尝试take()从空队列中获取项目,使其挂起。

应该同步“如果队列不为空则取”的过程,以使队列之间不会发生变化。

这样做的正确方法是什么?

完整代码如下。每次运行大约需要 1 秒。

package multithreadperformance;

import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;

public class MultithreadPerformance implements Runnable {
    
    static int numThreads = 50;
    static int numJobs = 5000;
    
    final BlockingQueue<Long> queue = new LinkedBlockingQueue<>();;
    static ArrayList<Thread> threads;
    
    public static void main(String[] args) {
        MultithreadPerformance bench = new MultithreadPerformance();
        bench.go();
    }

    public void go() {
        System.out.print("Go... ");
        
        long t0 = System.nanoTime();
        
        // Fill up the queue of jobs with a random number of miliseconds.
        long milisecs, milisecsMax = 20; // ms
        //
        try {
            for (int i = 0; i < numJobs; i++) {
                milisecs = ThreadLocalRandom.current().nextLong(milisecsMax);
                queue.put(milisecs);
            }
        } catch (InterruptedException ex) {
                System.out.println(ex.toString());
        } 
        
        // Create all threads
        threads = new ArrayList<>();
        for(int i = 0; i < numThreads; i++) {
            Thread thread = new Thread(this);
            thread.setName("Thread" + i);
            threads.add(thread);
        }

        // Start all threads
        threads.forEach((thread) -> {thread.start();});

        // Join all threads
        threads.forEach((thread) -> {try {
            thread.join();
            } catch (InterruptedException ex) {
                System.out.println(ex.toString());
            }
        });
        
        long et = System.nanoTime() - t0;
        System.out.println(String.format("done. Elapsed time %.3f s.", et/1e9));
    }

    // Worker function 
    // Sleep a number of miliseconds.
    public void worker(long milisecs) throws InterruptedException {
        Thread.sleep(milisecs);
    }
    
    @Override
    public void run() {
        long milisecs;
        try {
            while ( ! queue.isEmpty()) {
                milisecs  = queue.take();
                worker(milisecs);
            }
        } catch (InterruptedException ex) {
            System.out.println(ex.toString());
        }
    }
}

标签: javaconcurrency

解决方案


您可以调用poll()which 将自动删除队列的头部,或者null如果队列为空则返回。

Long millisecs;
while ( (millisecs = queue.poll()) != null) {  
     worker(millisecs);
}

推荐阅读