java - 如何通过缓冲完全并行化 Java8 流?
问题描述
我有一些这样的代码:
Stream<Item> stream = listPaths.parallelStream().flatMap(path -> { ... })
我还添加了这个:
System.setProperty(
"java.util.concurrent.ForkJoinPool.common.parallelism",
String.valueOf(Runtime.getRuntime().availableProcessors() * 4));
后来我打电话stream.forEach(...)
但是,我发现在一台 32 核的机器上,只使用了 5 到 8 个核。
我相信发生的事情是 flatMap() 内的代码和 forEach() 内的代码因不同的外部资源而遭受 I/O 延迟问题,并以“适应和开始”的方式返回数据——与“拉”流的性质。
是否有一个简单的(惯用的,而不是“编写自己的 200 行代码”)将流包装到某种“流缓冲区”中,这样可以在提供 forEach( )?
解决方案
我认为最好的方法是使用响应式流。有很多方法可以解决这个问题:
- 使用
<Flowable>
- 使用
RxJava
或者第三,使用Spring's Reactor framework
它们都有调度机制。就个人而言,我可以使用其中一种算法并且仍然对它们感到满意,除非我想编写 200 行代码。
推荐阅读
- powershell - 用于“监视”新文件的目录并移动到新位置的 Powershell 脚本
- java - 调用私有静态 void 方法
- iis - IIS Rewrite 模块不会在第一次加载页面时重定向
- c# - 关闭对话框后无法关注任何控件
- python - 如何获取导致特定总和的 top-x 元素
- r - 在R中的getURL函数中隐藏密码
- asp.net - 使用 Asp.Net/VB.Net 更改视频的 src
- error-handling - VBScript CopyFile 报告错误 53 File Not Found,但已成功复制文件
- typescript - 在具有两个存储的 mapStateToProps 中未定义 Redux 状态
- r - 查找某个数字属于哪一行