首页 > 解决方案 > ComputableFuture 并行调用

问题描述

我想并行运行 10 个 API。每个 API 都会返回一些值。我想在值的总计数等于 100 时停止,即,如果在获得所有 API 的结果之前获得 100 个结果,我不想等待所有 10 个 API。所以我想玩CompletableFuture.anyOf()循环并返回,但我无法找出正确的语法。另外,是否还有其他有效的方法可以做到这一点?

请回复。

提前致谢!

标签: javaasynchronousjava-8parallel-processingcompletable-future

解决方案


您可以使用 CountDownLatch

    ExecutorService executor = Executors.newFixedThreadPool(10);

    List<Integer> results = new ArrayList<>();
    int maxResults = 100;
    CountDownLatch latch = new CountDownLatch(maxResults);

    for (int i = 0; i < 10; ++i) {
        int apiNumber = i;
        executor.execute(() -> {
            while (results.size() < maxResults) {
                try {
                    Thread.sleep(new Random().nextInt(1000));
                    System.out.println("API-" + apiNumber + " call");
                    int[] newValues = new Random().ints(0, 10).limit(new Random().nextInt(10)).toArray(); // API call
                    for (int value : newValues) {
                        synchronized (results) {
                            if (results.size() < maxResults) {
                                results.add(value);
                                latch.countDown();
                            }
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    latch.await();

    System.out.println("The results have been calculated (" + results.size() + ")");


    executor.shutdown();

或移相器

    ExecutorService executor = Executors.newFixedThreadPool(10);

    List<Integer> results = new ArrayList<>();
    int maxResults = 100;
    Phaser phaser = new Phaser(1);
    phaser.register();

    for (int i = 0; i < 10; ++i) {
        int apiNumber = i;
        executor.execute(() -> {
            while (results.size() < maxResults) {
                try {
                    Thread.sleep(new Random().nextInt(1000));
                    System.out.println("API-" + apiNumber + " call");
                    int[] newValues = new Random().ints(0, 10).limit(new Random().nextInt(10)).toArray(); // API call
                    for (int value : newValues) {
                        synchronized (results) {
                            results.add(value);
                            if (results.size() >= maxResults) {
                                phaser.arrive();
                                break;
                            }
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    phaser.arriveAndAwaitAdvance();

    System.out.println("The results have been calculated (" + results.size() + ")");

    executor.shutdown();

推荐阅读