首页 > 解决方案 > 从具有多个线程的阻塞队列中读取

问题描述

我有一个使用阻塞队列的生产者-消费者模型,其中 4 个线程从目录读取文件将其放入阻塞队列,4 个线程(消费者)从阻塞队列中读取。

我的问题是每次只有一个消费者从 Blockingqueue 读取,而其他 3 个消费者线程没有读取:

        final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);

            CompletableFuture<Void> completableFutureProducer = produceUrls(files, queue, checker);
//not providing code for produceData , it is working file with all 4 //threads writing to Blocking queue. Here is the consumer code.

    private CompletableFuture<Validator> consumeData(
            final Response checker,
            final CompletableFuture<Void> urls
    ) {
        return CompletableFuture.supplyAsync(checker, 4)
                .whenComplete((result, err) -> {
                    if (err != null) {
                        LOG.error("consuming url worker failed!", err);
                        urls.cancel(true);
                    }
    });


    }
  completableFutureProducer.join();
            completableFutureConsumer.join();

这是我的代码。有人可以告诉我我做错了什么吗?或帮助提供正确的代码。为什么一位消费者从阻塞队列中读取数据。

为从阻塞队列读取的响应类添加代码:

    @Slf4j
    public final class Response implements Supplier<Check> {
        private final BlockingQueue<byte[]> data;
        private final AtomicBoolean producersComplete;
        private final Calendar calendar = Calendar.getInstance();
    
        public ResponseCode(
                final BlockingQueue<byte[]> data
        ) {
            this.data = data;
            producersDone = new AtomicBoolean();
    
        }
public void notifyProducersDone() {
    producersComplete.set(true);
}

        @Override
        public Check get() {
            try {
                Check check = null;
                try {
                    while (!data.isEmpty() || !producersDone.get()) {
                        final byte[] item = data.poll(1, TimeUnit.SECONDS);
                        if (item != null) {
                           LOG.info("{}",new String(item));
// I see only one thread printing result here .
                            validator = validateData(item);
                        }
                    }
        
                } catch (InterruptedException | IOException e) {
                    Thread.currentThread().interrupt();
                    throw new WriteException("Exception occurred while data validation", e);
        
                } 
                return check;
            } finally {
                LOG.info("Done reading data from BlockingQueue");
            }
        }
    }

标签: javamultithreadingblockingqueue

解决方案


仅凭这一点很难诊断,但检查可能不正确,data.isEmpty()因为队列可能碰巧暂时为空(但稍后获取项目)。因此,您的线程可能会在遇到临时空队列时立即退出。

相反,如果生产者完成并且您从poll. 这样,只有在确实没有更多项目要处理时,线程才会退出。

尽管您正在返回最后一项(单独)的结果,但这有点奇怪。你确定这是你想要的吗?

编辑:我最近做了一些非常相似的事情。是一个从文件读取的类,以多线程方式转换行,然后写入不同的文件(保留行的顺序)。
它还使用一个BlockingQueue. 它与您的代码非常相似,但它不检查quue.isEmpty()上述原因。这对我来说可以。


推荐阅读