首页 > 解决方案 > 如何从队列中分块取出项目?

问题描述

我有多个生产者线程,它们同时将对象添加到共享队列中。

我想创建一个从该共享队列中读取以进行进一步数据处理(数据库批量插入)的单线程使用者。

问题:我只想以块的形式从队列中获取数据,以便在批量插入期间获得更好的性能。因此,我必须以某种方式检测队列中有多少项目,然后从队列中取出所有这些项目,然后再次清空队列。

 BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<>();

 ExecutorService pes = Executors.newFixedThreadPool(4);
 ExecutorService ces = Executors.newFixedThreadPool(1);

 pes.submit(new Producer(sharedQueue, 1));
 pes.submit(new Producer(sharedQueue, 2));
 pes.submit(new Producer(sharedQueue, 3));
 pes.submit(new Producer(sharedQueue, 4));
 ces.submit(new Consumer(sharedQueue, 1));

class Producer implements Runnable {
    run() {
            ...
            sharedQueue.put(obj);
    }
}

class Consumer implements Runnable {
    run() {
            ...
            sharedQueue.take();
    }
}

消费者问题:如何轮询共享队列,等待有 X 个项目的队列,然后取出所有项目并同时清空队列(以便消费者可以再次开始轮询和等待)?

我愿意接受任何建议,并且不一定受上述代码的约束。

标签: javaconcurrencyqueue

解决方案


我最近开发了这个实用程序,如果队列元素未达到批处理大小,则使用刷新超时来批处理 BlockingQueue 元素。它还支持使用多个实例来详细说明同一组数据的 fanOut 模式:

// Instantiate the registry
FQueueRegistry registry = new FQueueRegistry();

// Build FQueue consumer
registry.buildFQueue(String.class)
                .batch()
                .withChunkSize(5)
                .withFlushTimeout(1)
                .withFlushTimeUnit(TimeUnit.SECONDS)
                .done()
                .consume(() -> (broadcaster, elms) -> System.out.println("elms batched are: "+elms.size()));

// Push data into queue
for(int i = 0; i < 10; i++){
        registry.sendBroadcast("Sample"+i);
}

更多信息在这里!

https://github.com/fulmicotone/io.fulmicotone.fqueue


推荐阅读