java - 从 java 中的 Futures 列表中收集结果
问题描述
我正在尝试使用期货进行并发 api 调用。代码:
private void init() throws ExecutionException, InterruptedException {
Long start = System.currentTimeMillis();
List<ApiResponse> responses = fetchAllUsingFuture(ids, 3);
log.info(responses.toString());
Long finish = System.currentTimeMillis();
log.info(MessageFormat.format("Process duration: {0} in ms", finish-start));
}
private List<ApiResponse> fetchAllUsingFuture(List<String> ids, int threadCount) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
List<List<String>> chunks = Utils.splitToChunks(ids, threadCount);
List<Future<List<ApiResponse>>> futures = new ArrayList<>();
chunks.forEach(chunk -> {
futures.add(wrapFetchInFuture(chunk));
});
Future<List<ApiResponse>> resultFuture = executorService.submit(() -> {
List<ApiResponse> responses = new ArrayList<>();
futures.forEach(future -> {
try {
responses.addAll(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
return responses;
});
executorService.shutdown();
return resultFuture.get();
}
private Future<List<ApiResponse>> wrapFetchInFuture(List<String> ids) {
return new FutureTask<>(() -> {
List<ApiResponse> responses = new ArrayList<>();
ids.forEach(id -> {
responses.add(fetchData(id));
});
return responses;
});
}
private ApiResponse fetchData(String id) {
ResponseEntity<ApiResponse> response = restTemplate.getForEntity(id, ApiResponse.class);
log.info(MessageFormat.format("Fetching from {0}", id));
ApiResponse body = response.getBody();
log.info(MessageFormat.format("Retrieved {0}", body));
return body;
}
它不执行,应用程序启动,然后只是挂起。期货没有得到履行。所有建议表示赞赏。PS我知道使用CompletableFuture更容易做到这一点,我只是想知道如何使用Futures做到这一点
解决方案
在问题的原始版本中,您正在创建一个列表,FutureTasks
但从未将它们发送ExecutorService
给运行它们。任务永远不会完成,所以Future.get
永远阻塞。
在问题的更新版本中,您已将执行等待的代码作为任务放入执行程序服务中。FutureTasks 永远不会运行,所以FutureTask.get
仍然会永远阻塞。
我建议您将代码更改fetchAllUsingFuture
为:
List<Callable<List<ApiResponse>>> tasks = new ArrayList<>();
chunks.forEach(chunk -> {
tasks.add(wrapFetchInCallable(chunk));
});
List<Future<List<ApiResponse>>> futures = executorService.invokeAll(tasks);
wherewrapFetchInCallable
创建 aCallable
而不是FutureTask
:
private static Callable<List<ApiResponse>> wrapFetchInCallable(List<String> ids) {
return () -> {
List<ApiResponse> responses = new ArrayList<>();
ids.forEach(id -> {
responses.add(fetchData(id));
});
return responses;
};
}
推荐阅读
- r - 在 R 中循环遍历数据帧的不同子集
- r - 具有单个分类和多个离散/连续变量的条形图
- firebase - 使用 Firebase 托管托管 Google Cloud Run 会引发混合内容错误
- python - 按第一个值对带有元组的列表进行排序
- node.js - 如何部署一个 azure webapp - nodejs
- javascript - 使用 reactjs 切换组件
- r - 无法在 AWS EC2 Ubuntu 实例上安装 httpuv(对于 Shiny)
- artifactory - JFrog Artifactory:使用“记住我”登录功能导致登录失败
- javascript - 响应式导航列表不显示内容
- python - Python将类变量作为对类方法的引用