首页 > 解决方案 > 等待所有阻塞队列元素取出后处理

问题描述

在以下场景中,终结器线程必须等待消费者线程处理完所有队列元素才能完成执行:

private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private final Object queueMonitor = new Object();

// Consumer thread
while (true) {
    Object element = queue.take();
    consume(element);
    synchronized (queueMonitor) {
        queueMonitor.notifyAll();
    }
}

// Finalizer thread
synchronized (queueMonitor) {
    while (!queue.isEmpty()) {
        queueMonitor.wait();
    }
}

随着时间的推移,元素被添加到队列中。消费者守护线程一直运行,直到 JVM 终止,此时必须允许它完成对所有排队元素的处理。目前这是由终结器线程完成的,它是一个关闭钩子,应该延迟在 JVM 终止时杀死消费者线程。

问题:
如果在从队列中取出最后一个元素之后启动终结器线程,则 while 循环条件的计算结果为false,因此执行完成而consume()尚未返回,因为queueMonitor完全跳过了等待。

研究:
一个理想的解决方案是查看队列,然后在元素被消耗后将其删除。

标签: javamultithreadingconcurrencyblockingqueue

解决方案


一种方法可能是您使用CountDownLatch- 在其上放置终结器块,然后在consume().

基本上不阻塞队列,阻塞任务完成。

private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private volatile boolean running = true;
private final CountDownLatch terminationLatch = new CountDownLatch(1);

// Consumer thread
while (running || !queue.isEmpty()) {
    Object element = queue.poll(100, TimeUnit.MILLISECONDS);
    if (element == null) continue;
    consume(element);
}
terminationLatch.countDown();

// Finalizer thread
running = false;
terminationLatch.await();

推荐阅读