首页 > 解决方案 > 如何通过缓冲完全并行化 Java8 流?

问题描述

我有一些这样的代码:

Stream<Item> stream = listPaths.parallelStream().flatMap(path -> { ... })

我还添加了这个:

    System.setProperty(
            "java.util.concurrent.ForkJoinPool.common.parallelism",
            String.valueOf(Runtime.getRuntime().availableProcessors() * 4));

后来我打电话stream.forEach(...)

但是,我发现在一台 32 核的机器上,只使用了 5 到 8 个核。

我相信发生的事情是 flatMap() 内的代码和 forEach() 内的代码因不同的外部资源而遭受 I/O 延迟问题,并以“适应和开始”的方式返回数据——与“拉”流的性质。

是否有一个简单的(惯用的,而不是“编写自己的 200 行代码”)将流包装到某种“流缓冲区”中,这样​​可以在提供 forEach( )?

标签: javajava-8java-stream

解决方案


我认为最好的方法是使用响应式流。有很多方法可以解决这个问题:

  • 使用<Flowable>
  • 使用RxJava

或者第三,使用Spring's Reactor framework 它们都有调度机制。就个人而言,我可以使用其中一种算法并且仍然对它们感到满意,除非我想编写 200 行代码。


推荐阅读