java-8 - 如何为 Java 8 并行流指定 ForkJoinPool?
问题描述
据我所知,并行流使用默认值ForkJoinPool.commonPool
,默认情况下它的线程数比您的处理器少一个。我想使用我自己的自定义线程池。
像这样:
@Test
public void stream() throws Exception {
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (Exception e) {
}
return item * 10;
})).get().collect(Collectors.toList());
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);
}
我的习惯ForkJoinPool
从未使用过。我像这样更改默认并行度:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
它运作良好 - 任务只需大约 1 秒。
在我的应用程序中,任务包含繁重的 IO 操作(从 db 读取数据)。所以我需要更高的并行度,但我不想更改 JVM 属性。
那么指定我自己的正确方法是什么ForkJoinPool
?
或者如何在 IO 密集型情况下使用并行流?
解决方案
流是懒惰的;当您开始终端操作时,所有工作都已完成。在您的情况下,终端操作是.collect(Collectors.toList())
,您在main
线程中调用get()
. 因此,实际工作的完成方式与您在main
线程中构建整个流的方式相同。
为了使您的池生效,您必须将终端操作移动到提交的任务中:
ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);
我们还可以通过在main
线程中构造流,只将终端操作提交到池中来演示终端操作的相关性:
Stream<Integer> stream = testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();
但是您应该记住,这是未记录的行为,不能保证。实际的答案一定是当前形式的 Stream API 没有线程控制(也没有帮助处理检查的异常),不适合并行 I/O 操作。
推荐阅读
- javascript - 联系表单脚本 Ajax 无法识别来自 Mail.php 的正确结果
- react-native - 运行 React Native 应用程序时出现“找不到变量:a”错误
- unity3d - 等轴测 3D Unity 项目中的 2D 精灵
- python - 如何在pygame rect框中获取条形码扫描仪输出
- java - 在 JQuery 中使用 ajax 方法 .get() 和 .post() 添加到购物车
- javascript - 使用数据表上的单击事件调用“组件函数”
- apache - 如何在 msys2 上构建 apache httpd 2.* 及更高版本
- excel - VBA返回动态数组并分配给变量
- android - 如何防止手机屏幕在使用 ADB 调用 Android pie (9) 时关闭(或不使用,如果可能)
- python - 在散点图上绘制折线图