首页 > 解决方案 > 如何为 Java 8 并行流指定 ForkJoinPool?

问题描述

据我所知,并行流使用默认值ForkJoinPool.commonPool,默认情况下它的线程数比您的处理器少一个。我想使用我自己的自定义线程池。

像这样:

@Test
public void stream() throws Exception {
    //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
    ForkJoinPool pool = new ForkJoinPool(10);
    List<Integer> testList = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
    long start = System.currentTimeMillis();
    List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
        try {
            // read from database
            Thread.sleep(1000);
            System.out.println("task" + item + ":" + Thread.currentThread());
        } catch (Exception e) {
        }
        return item * 10;
    })).get().collect(Collectors.toList());
    System.out.println(result);
    System.out.println(System.currentTimeMillis() - start);
}

结果: 在此处输入图像描述

我的习惯ForkJoinPool从未使用过。我像这样更改默认并行度:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

它运作良好 - 任务只需大约 1 秒。

在我的应用程序中,任务包含繁重的 IO 操作(从 db 读取数据)。所以我需要更高的并行度,但我不想更改 JVM 属性。

那么指定我自己的正确方法是什么ForkJoinPool

或者如何在 IO 密集型情况下使用并行流?

标签: java-8java-stream

解决方案


流是懒惰的;当您开始终端操作时,所有工作都已完成。在您的情况下,终端操作是.collect(Collectors.toList()),您在main线程中调用get(). 因此,实际工作的完成方式与您在main线程中构建整个流的方式相同。

为了使您的池生效,您必须将终端操作移动到提交的任务中:

ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
    1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (InterruptedException e) {}
    return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);

我们还可以通过在main线程中构造流,只将终端操作提交到池中来演示终端操作的相关性:

Stream<Integer> stream = testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (InterruptedException e) {}
    return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();

但是您应该记住,这是未记录的行为,不能保证。实际的答案一定是当前形式的 Stream API 没有线程控制(也没有帮助处理检查的异常),不适合并行 I/O 操作。


推荐阅读