首页 > 解决方案 > Stream.parallel() 不会更新 spliterator 的特性吗?

问题描述

这个问题是基于这个问题的答案Stream.of 和 IntStream.range 有什么区别?

由于IntStream.range产生了一个已经排序的流,下面代码的输出只会生成输出0

IntStream.range(0, 4)
         .peek(System.out::println)
         .sorted()
         .findFirst();

分离器也将具有SORTED特征。下面的代码返回true

System.out.println(
    IntStream.range(0, 4)
             .spliterator()
             .hasCharacteristics(Spliterator.SORTED)
);

现在,如果我parallel()在第一个代码中引入 a ,那么正如预期的那样,输出将包含从0to的所有 4 个数字,3但以随机顺序排列,因为流不再排序,因为parallel().

IntStream.range(0, 4)
         .parallel()
         .peek(System.out::println)
         .sorted()
         .findFirst();

这将产生如下内容:(以任何随机顺序)

2
0
1
3

所以,我希望该SORTED属性已被删除,因为parallel(). 但是,下面的代码true也会返回。

System.out.println(
    IntStream.range(0, 4)
             .parallel()
             .spliterator()
             .hasCharacteristics(Spliterator.SORTED)
);

为什么不parallel()改变SORTED属性?并且由于所有四个数字都被打印出来,Java 如何意识到即使SORTED属性仍然存在,流也没有排序?

标签: javajava-stream

解决方案


究竟如何做到这一点在很大程度上是一个实现细节。您必须深入挖掘源代码才能真正了解原因。基本上,并行和顺序流水线的处理方式不同。查看AbstractPipeline.evaluate,它检查isParallel(),然后根据管道是否并行执行不同的操作。

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

如果您再查看SortedOps.OfInt,您会发现它覆盖了两个方法:

@Override
public Sink<Integer> opWrapSink(int flags, Sink sink) {
    Objects.requireNonNull(sink);

    if (StreamOpFlag.SORTED.isKnown(flags))
        return sink;
    else if (StreamOpFlag.SIZED.isKnown(flags))
        return new SizedIntSortingSink(sink);
    else
        return new IntSortingSink(sink);
}

@Override
public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
                                               Spliterator<P_IN> spliterator,
                                               IntFunction<Integer[]> generator) {
    if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
        return helper.evaluate(spliterator, false, generator);
    }
    else {
        Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);

        int[] content = n.asPrimitiveArray();
        Arrays.parallelSort(content);

        return Nodes.node(content);
    }
}

opWrapSink如果它是顺序管道,最终将被调用,并且opEvaluateParallel(顾名思义)将在它是并行流时被调用。请注意opWrapSink,如果管道已经排序(只是将其原封不动地返回),则不会对给定的接收器执行任何操作,但opEvaluateParallel始终评估拆分器。

另请注意,并行性和排序性并不相互排斥。您可以拥有具有这些特征的任意组合的流。

“排序”是 a 的一个特征Spliterator。从技术上讲,它不是 a 的特征Stream(就像“并行”一样)。当然,parallel可以使用具有全新特性的全新拆分器(从原始拆分器获取元素)创建一个流,但是当您可以重用相同的拆分器时,为什么要这样做呢?我想你在任何情况下都必须以不同的方式处理并行和顺序流。


推荐阅读