java - 什么是 ParallelStream 队列行为?
问题描述
我正在使用 parallelStream 并行执行一些文件上传,有些是大文件,有些是小文件。我注意到并非所有工人都被使用。
一开始一切运行良好,所有线程都在使用(我将并行度选项设置为 16)。然后在某个点(一旦它获得更大的文件),它只使用一个线程
简化代码:
files.parallelStream().forEach((file) -> {
try (FileInputStream fileInputStream = new FileInputStream(file)) {
IDocumentStorageAdaptor uploader = null;
try {
logger.debug("Adaptors before taking: " + uploaderPool.size());
uploader = uploaderPool.take();
logger.debug("Took an adaptor!");
logger.debug("Adaptors after taking: " + uploaderPool.size());
uploader.addNewFile(file);
} finally {
if (uploader != null) {
logger.debug("Adding one back!");
uploaderPool.put(uploader);
logger.debug("Adaptors after putting: " + uploaderPool.size());
}
}
} catch (InterruptedException | IOException e) {
throw new UploadException(e);
}
});
uploaderPool 是一个 ArrayBlockingQueue。日志:
[ForkJoinPool.commonPool-worker-8] - Adaptors before taking: 0
[ForkJoinPool.commonPool-worker-15] - Adding one back!
[ForkJoinPool.commonPool-worker-8] - Took an adaptor!
[ForkJoinPool.commonPool-worker-15] - Adaptors after putting: 0
...
...
...
[ForkJoinPool.commonPool-worker-10] - Adding one back!
[ForkJoinPool.commonPool-worker-10] - Adaptors after putting: 16
[ForkJoinPool.commonPool-worker-10] - Adaptors before taking: 16
[ForkJoinPool.commonPool-worker-10] - Took an adaptor!
[ForkJoinPool.commonPool-worker-10] - Adaptors after taking: 15
[ForkJoinPool.commonPool-worker-10] - Adding one back!
[ForkJoinPool.commonPool-worker-10] - Adaptors after putting: 16
[ForkJoinPool.commonPool-worker-10] - Adaptors before taking: 16
[ForkJoinPool.commonPool-worker-10] - Took an adaptor!
[ForkJoinPool.commonPool-worker-10] - Adaptors after taking: 15
似乎所有工作(列表中的项目)都分布在 16 个线程中,委派给一个线程的事情只会等到线程空闲工作而不是使用可用线程。有没有办法改变 parallelStream 的工作排队方式?我阅读了 forkjoinpool 文档,其中提到了工作窃取,但仅适用于衍生的子任务。
我的另一个计划可能是随机化我正在使用 parallelStream 的列表的排序,也许这会平衡一些事情。
谢谢!
解决方案
并行流的拆分与计算启发式算法针对数据并行操作进行了调整,而不是针对 IO 并行操作进行了调整。(换句话说,它们被调整为保持 CPU 忙碌,但不会产生比 CPU 更多的任务。)因此,它们偏向于计算而不是分叉。目前没有覆盖这些选择的选项。
推荐阅读
- java - 从同一列的组件菜单中删除 JTable 列时出错
- matlab - 将 2×4 矩阵更改为 2×2×4 矩阵
- php - Laravel Validator 针对不同表的多个唯一规则
- reactjs - 使用可加载组件在 Now 上部署
- angularjs - 使用 AngularJS 对大表进行排序
- python - 不同框架中的 Tkinter 条目小部件
- forms - SwiftUI 删除 NavigationView 表单中的空白行
- c++ - QWebEngine 不能正确处理相对 url 链接
- c# - 远程调试器命中断点,但 Visual Studio 没有注意到
- ios - 在 iOS 13 上未调用 didConnectPeripheral 委托