首页 > 解决方案 > 从 java 中的 Futures 列表中收集结果

问题描述

我正在尝试使用期货进行并发 api 调用。代码:

private void init() throws ExecutionException, InterruptedException {
    Long start = System.currentTimeMillis();
    List<ApiResponse> responses = fetchAllUsingFuture(ids, 3);
    log.info(responses.toString());
    Long finish = System.currentTimeMillis();
    log.info(MessageFormat.format("Process duration: {0} in ms", finish-start));
}

private List<ApiResponse> fetchAllUsingFuture(List<String> ids, int threadCount) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
    List<List<String>> chunks = Utils.splitToChunks(ids, threadCount);
    List<Future<List<ApiResponse>>> futures = new ArrayList<>();
    chunks.forEach(chunk -> {
        futures.add(wrapFetchInFuture(chunk));
    });
    Future<List<ApiResponse>> resultFuture = executorService.submit(() -> {
        List<ApiResponse> responses = new ArrayList<>();
        futures.forEach(future -> {
            try {
                responses.addAll(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        return responses;
    });

    executorService.shutdown();
    return resultFuture.get();
}



private Future<List<ApiResponse>> wrapFetchInFuture(List<String> ids) {
    return new FutureTask<>(() -> {
        List<ApiResponse> responses = new ArrayList<>();
        ids.forEach(id -> {
            responses.add(fetchData(id));
        });
        return responses;
    });
}

private ApiResponse fetchData(String id) {
    ResponseEntity<ApiResponse> response = restTemplate.getForEntity(id, ApiResponse.class);
    log.info(MessageFormat.format("Fetching from {0}", id));
    ApiResponse body = response.getBody();
    log.info(MessageFormat.format("Retrieved {0}", body));
    return body;
}

它不执行,应用程序启动,然后只是挂起。期货没有得到履行。所有建议表示赞赏。PS我知道使用CompletableFuture更容易做到这一点,我只是想知道如何使用Futures做到这一点

标签: java

解决方案


在问题的原始版本中,您正在创建一个列表,FutureTasks但从未将它们发送ExecutorService给运行它们。任务永远不会完成,所以Future.get永远阻塞。

在问题的更新版本中,您已将执行等待的代码作为任务放入执行程序服务中。FutureTasks 永远不会运行,所以FutureTask.get仍然会永远阻塞。

我建议您将代码更改fetchAllUsingFuture为:

    List<Callable<List<ApiResponse>>> tasks = new ArrayList<>();
    chunks.forEach(chunk -> {
        tasks.add(wrapFetchInCallable(chunk));
    });
    List<Future<List<ApiResponse>>> futures = executorService.invokeAll(tasks);

wherewrapFetchInCallable创建 aCallable而不是FutureTask

private static Callable<List<ApiResponse>> wrapFetchInCallable(List<String> ids) {
    return () -> {
        List<ApiResponse> responses = new ArrayList<>();
        ids.forEach(id -> {
            responses.add(fetchData(id));
        });
        return responses;
    };
}

推荐阅读