首页 > 解决方案 > 从迭代器创建的 CompletableFuture 流不会被延迟评估

问题描述

我在如何以及何时完成可完成的期货方面有点挣扎。我创建了这个测试用例:

import org.junit.Test;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StreamOfCompletableFuturesTest {
    @Test
    public void testList() {
        completeFirstTwoElements(
                Stream.of("list one", "list two", "list three", "list four", "list five")
        );
    }

    @Test
    public void testIterator() {
        Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

        completeFirstTwoElements(
            StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
        );
    }

    private void completeFirstTwoElements(Stream<String> stream) {
        stream
                .map(this::cf)
                .limit(2)
                .parallel()
                .forEach(cf -> {
                    try {
                        System.out.println(cf.get());
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                });
    }

    private CompletableFuture<String> cf(String result) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Running " + result);
            return result;
        });
    }
}

输出是:

Running list one
Running list two
list two
list one
Running iterator one
Running iterator two
Running iterator three
Running iterator four
Running iterator five
iterator two
iterator one

testList方法按预期工作。's 仅在最后CompletableFuture评估,因此在 limit 方法之后仅保留前两项。

然而,这个testIterator方法是出乎意料的。所有CompletableFuture的都完成了,限制只在之后完成。

如果我parallel()从流中删除该方法,它会按预期工作。但是,处理(forEach())应该并行完成,因为在我的完整程序中它是一个长时间运行的方法。

任何人都可以解释为什么会这样吗?

看起来这取决于 Java 版本,所以我使用的是 1.8:

$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)

标签: javajava-streamcompletable-futurespliterator

解决方案


limit()并行性适用于整个管道,因此您无法真正控制在并行应用之前将执行什么Stream。唯一的保证是,之后的limit()内容只会在保留的元素上执行。

两者之间的差异可能是由于一些实现细节或其他Stream特性。事实上,您可以通过使用SIZED特征来轻松地反转行为。似乎当Stream具有已知大小时,只处理 2 个元素。

因此,例如,应用一个简单的filter()会丢失列表版本的大小:

completeFirstTwoElements(
        Stream.of("list one", "list two", "list three", "list four", "list five").filter(a -> true)
);

输出例如:

Running list one
Running list five
Running list two
Running list three
list one
list two

并且不使用“修复”行为的未知大小版本:Spliterator.spliterator()

Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

completeFirstTwoElements(
        StreamSupport.stream(Spliterators.spliterator(iterator, Spliterator.ORDERED, 5), false)
);

输出:

Running iterator two
Running iterator one
iterator one
iterator two

推荐阅读