java - Java ForkJoinPool 线程未完成
问题描述
我正在尝试使用 Java 流和 ForkJoinPool 并行化 for 循环,以控制使用的线程数。当使用单线程运行时,并行化代码返回与顺序程序相同的结果。顺序代码是一组标准的 for 循环:
for(String file : fileList){
for(String item : xList){
for(String x : aList) {
// action code
}
}
}
以下是我的并行实现:
ForkJoinPool threadPool = new ForkJoinPool(NUM_THREADS);
int chunkSize = aList.size()/NUM_THREADS;
for(String file : fileList){
for(String item : xList){
IntStream.range(0, NUM_THREADS)
.parallel().forEach(i -> threadPool.submit(() -> {
aList.subList(i*chunkSize, Math.min(i*chunkSize + chunkSize -1, aList.size()-1))
.forEach(x -> {
// action code
});
}));
threadPool.shutdown();
threadPool.awaitTermination(5, TimeUnit.MINUTES);
}
}
当使用超过 1 个线程时,只能完成有限次数的迭代。我试图使用.shutdown()
并.awaitTermination()
确保完成所有线程,但这似乎不起作用。每次运行的迭代次数差异很大(0-1500 之间)。
注意:我使用的是具有 8 个可用内核(4 个双核)的 Macbook Pro,并且我的操作代码不包含使并行化不安全的引用。
任何建议将不胜感激,谢谢!
解决方案
我认为您遇到的实际问题是由您调用shutdown
. ForkJoinPool
如果您查看 javadoc,这会导致“有序关闭,其中执行先前提交的任务,但不会接受新任务” - 即。我希望只有一项任务能够真正完成。
顺便说一句,使用ForkJoinPool
你使用它的方式没有任何意义。AForkJoinPool
旨在递归地拆分工作负载,这与您在循环中创建子列表所做的不同 - 但 aForkJoinPool
应该由RecursiveAction
s 自己拆分其工作,而不是像您在循环中那样预先拆分它。不过,这只是一个旁注;您的代码应该可以正常运行,但是如果您只是将任务提交给正常的ExecutorService
,例如您得到的Executors.newFixedThreadPool(parallelism)
而不是 by ,则会更清楚new ForkJoinPool()
。
推荐阅读
- javascript - 带有异议 JS 的 INNER JOIN 查询?
- python - Django:不允许的方法(POST):
- java - 使用 Spring WebFlux 和 Controller 测试失败
- sqlite - 如何获得一列>X中值的比例?
- html - 如何在背景上使用带有文字的idangero swiper然后响应?
- c# - 如何用两个或三个 SetOnClickListener 修复我的列表视图
- python - 在 Django 中尝试通过电子邮件或用户名登录时显示未定义的路径
- python - 是否有一个 pandas 函数来评估子字符串是否在给定字符串中?
- ios - 在 UIViewControllerAnimatedTransitioning 中,“to ViewController”存在视图,但“to View”为 nil
- javascript - this.state.[object].map 不是函数