首页 > 解决方案 > 加入 CompletableFutures

问题描述

我最近一直在使用 CompletableFuture,试图并行化持久的 IO 操作。我必须等待一切都完成才能返回,所以我两个都用过

CompletableFuture.allOf().join()stream().map(CompletableFuture::join)

强制执行。但是,查看日志,我的印象是 allOf().join 更快。我玩过一个测试看看。最后的输出总是显示流连接花费的时间更长。我看到的一件事是,如果我在 CompleteableFuture.supplyAsync() 中跳过 System.out.println(),则差异会更小。

测试的典型输出如下: streamJoin: 3196 ms, allof: 3055 ms

是 allOf().join(),然后以最快的方式收集,还是我的测试有缺陷?

package com.oyvind.completablefutures;

import org.junit.Test;

import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class CompletableFutureTest {

    @Test
    public void joinPerformanceTest() throws Exception{

        // stream join
        long startStreamJoin = System.currentTimeMillis();
        List<Integer> result1 = listOfCompletableFutures("stream", Executors.newFixedThreadPool(100))
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());// trigger execution
        long spentTimeStreamJoin = System.currentTimeMillis() - startStreamJoin;

        // allOf() join
        long startAllOf = System.currentTimeMillis();
        List<CompletableFuture<Integer>> completableFutures = listOfCompletableFutures("allOf", Executors.newFixedThreadPool(100));
        CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();
        List<Integer> result2 = completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
        long spentTimeAllOf = System.currentTimeMillis() - startAllOf;

        log("streamJoin: %s ms, allof: %s ms", spentTimeStreamJoin, spentTimeAllOf);
    }

    private List<CompletableFuture<Integer>> listOfCompletableFutures(String name, Executor executor) {

        return IntStream.range(1, 1000)
                .boxed()
                .map(
                        i -> CompletableFuture.supplyAsync(
                                () -> logAndSleepFor1Second(name),
                                executor
                        )
                )
                .collect(Collectors.toList());
    }

    private int logAndSleepFor1Second(String name) {
        log("Starting %s: %s", name, Thread.currentThread().getName());
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        return 1;
    }

    private void log(String format, Object... args) {
        System.out.println(LocalDateTime.now() + ": " + String.format(format, args));
    }
}

标签: javacompletable-future

解决方案


推荐阅读