首页 > 解决方案 > 并行流在不同的操作下可以正常工作吗?

问题描述

我正在阅读有关无国籍状态的内容,并在doc中遇到了这个:

如果流操作的行为参数是有状态的,则流管道结果可能是不确定的或不正确的。有状态的 lambda(或实现适当功能接口的其他对象)的结果取决于在流管道执行期间可能更改的任何状态。

现在,如果我有一个字符串列表(strList比如说),然后尝试通过以下方式使用并行流从中删除重复的字符串:

List<String> resultOne = strList.parallelStream().distinct().collect(Collectors.toList());

或者如果我们想要不区分大小写:

List<String> result2 = strList.parallelStream().map(String::toLowerCase)
                       .distinct().collect(Collectors.toList());

这段代码是否有任何问题,因为并行流会将输入拆分,并且在一个块中不同并不一定意味着在整个输入中不同?

编辑(以下答案的快速摘要)

distinct是一个有状态的操作,在有状态的中间操作的情况下,并行流可能需要多次传递或大量缓冲开销。如果元素的顺序不相关,也distinct可以更有效地实现。也根据文档

对于有序流,不同元素的选择是稳定的(对于重复元素,会保留在遇到顺序中首先出现的元素。)对于无序流,不做稳定性保证。

但是如果有序流并行运行,distinct 可能是不稳定的 - 意味着它会在重复的情况下保留任意元素,而不一定是预期的第一个元素distinct

链接

在内部,distinct() 操作保留了一个 Set,其中包含以前见过的元素,但它隐藏在操作中,我们无法从应用程序代码中获取它。

因此,在并行流的情况下,它可能会消耗整个流或可能使用 CHM(类似ConcurrentHashMap.newKeySet())。对于有序的,它很可能会使用LinkedHashSet或类似的结构。

标签: javajava-8java-stream

解决方案


粗略指出doc重点,我的)的相关部分:

中间操作又分为无状态操作和有状态操作。无状态操作,例如过滤器和映射,在处理新元素时不保留先前看到的元素的状态——每个元素都可以独立于其他元素的操作进行处理。有状态的操作,例如 distinct 和 sorted,在处理新元素时可能会合并来自先前看到的元素的状态

有状态的操作可能需要在产生结果之前处理整个输入。例如,在查看流的所有元素之前,无法通过对流进行排序产生任何结果。因此,在并行计算下,一些包含有状态中间操作的管道可能需要对数据进行多次传递,或者可能需要缓冲重要数据。仅包含无状态中间操作的管道可以单次处理,无论是顺序的还是并行的,数据缓冲最少

如果您进一步阅读(订购部分):

流可能有也可能没有定义的相遇顺序。流是否有遇到顺序取决于源和中间操作。某些流源(例如 List 或数组)本质上是有序的,而其他流源(例如 HashSet)则不是。一些中间操作,例如 sorted(),可能会对原本无序的流施加遇到顺序,而其他操作可能会使有序流呈现无序,例如 BaseStream.unordered()。此外,某些终端操作可能会忽略遇到顺序,例如 forEach()。

...

对于并行流,放宽排序约束有时可以提高执行效率。如果元素的排序不相关,则可以更有效地实现某些聚合操作,例如过滤重复项 (distinct()) 或分组缩减 (Collectors.groupingBy())。类似地,本质上与遇到顺序相关的操作,例如 limit(),可能需要缓冲以确保正确排序,从而破坏并行性的好处。如果流有遇到顺序,但用户并不特别关心遇到顺序,使用 unordered() 显式对流进行降序可能会提高某些有状态或终端操作的并行性能. 然而,大多数流管道,例如上面的“块的权重之和”示例,即使在排序约束下仍然有效地并行化。

综上所述,

  • distinct 可以很好地处理并行流,但您可能已经知道,它必须在继续之前消耗整个流,这可能会占用大量内存。
  • 如果项目的来源是无序集合(例如 hashset)或流是unordered()distinct则不担心对输出进行排序,因此将是高效的

.unordered()如果您不担心顺序并希望看到更多性能,解决方案是添加到流管道中。

List<String> result2 = strList.parallelStream()
                              .unordered()
                              .map(String::toLowerCase)
                              .distinct()
                              .collect(Collectors.toList());

唉,Java 中没有(可用的内置)并发哈希集(除非他们很聪明ConcurrentHashMap),所以我只能给你留下一个不幸的可能性,即 distinct 是使用常规 Java 集以阻塞方式实现的。在这种情况下,我看不出做并行不同的任何好处。


编辑:我说得太早了。使用具有 distinct 的并行流可能会有一些好处。它看起来distinct比我最初想象的更聪明。请参阅@Eugene 的 回答


推荐阅读