java - 如何从队列中分块取出项目?
问题描述
我有多个生产者线程,它们同时将对象添加到共享队列中。
我想创建一个从该共享队列中读取以进行进一步数据处理(数据库批量插入)的单线程使用者。
问题:我只想以块的形式从队列中获取数据,以便在批量插入期间获得更好的性能。因此,我必须以某种方式检测队列中有多少项目,然后从队列中取出所有这些项目,然后再次清空队列。
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<>();
ExecutorService pes = Executors.newFixedThreadPool(4);
ExecutorService ces = Executors.newFixedThreadPool(1);
pes.submit(new Producer(sharedQueue, 1));
pes.submit(new Producer(sharedQueue, 2));
pes.submit(new Producer(sharedQueue, 3));
pes.submit(new Producer(sharedQueue, 4));
ces.submit(new Consumer(sharedQueue, 1));
class Producer implements Runnable {
run() {
...
sharedQueue.put(obj);
}
}
class Consumer implements Runnable {
run() {
...
sharedQueue.take();
}
}
消费者问题:如何轮询共享队列,等待有 X 个项目的队列,然后取出所有项目并同时清空队列(以便消费者可以再次开始轮询和等待)?
我愿意接受任何建议,并且不一定受上述代码的约束。
解决方案
我最近开发了这个实用程序,如果队列元素未达到批处理大小,则使用刷新超时来批处理 BlockingQueue 元素。它还支持使用多个实例来详细说明同一组数据的 fanOut 模式:
// Instantiate the registry
FQueueRegistry registry = new FQueueRegistry();
// Build FQueue consumer
registry.buildFQueue(String.class)
.batch()
.withChunkSize(5)
.withFlushTimeout(1)
.withFlushTimeUnit(TimeUnit.SECONDS)
.done()
.consume(() -> (broadcaster, elms) -> System.out.println("elms batched are: "+elms.size()));
// Push data into queue
for(int i = 0; i < 10; i++){
registry.sendBroadcast("Sample"+i);
}
更多信息在这里!
推荐阅读
- d3.js - d3将csv文件导入数组
- c# - CEFsharp浏览器注入jquery
- google-sheets - 谷歌电子表格条件格式基于日期的列的不同单元格
- swift - 更改 tabBarItem TableViewController 后看起来像黑屏
- go - 如何使用 Go 构建一个可以接收和发送消息到多个客户端的服务器?
- python - 如何将数据值添加到 pandas df.plot(kind='bar') 列?
- c++ - Qt5 抛出 std::bad_alloc
- python - ModuleNotFoundError:没有名为“citipy”的模块错误
- ionic-framework - 在 PlayStore 中自动检测 Ionic 3 应用程序的新版本
- java - 如何在 Corda 中创建自定义内联子流,例如 CollectSignaturesFlow/SignTransactionFlow