java - 如何在java中以最有效和最优雅的方式使用并行处理
问题描述
我有不同的数据源,我想从这些数据源中并行请求(因为每个请求都是一个 http 调用,并且可能非常耗时)。但我将只使用来自这些请求的 1 个响应。所以我会优先考虑它们。如果第一个响应无效,我将检查第二个响应。如果它也无效,我想使用第三个,等等。但我想停止处理并在收到第一个正确响应后立即返回结果。
为了模拟这个问题,我创建了以下代码,我试图在其中使用 java 并行流。但问题是我只有在处理完所有请求后才会收到最终结果。
public class ParallelExecution {
private static Supplier<Optional<Integer>> testMethod(String strInt) {
return () -> {
Optional<Integer> result = Optional.empty();
try {
result = Optional.of(Integer.valueOf(strInt));
System.out.printf("converted string %s to int %d\n",
strInt,
result.orElse(null));
} catch (NumberFormatException ex) {
System.out.printf("CANNOT CONVERT %s to int\n", strInt);
}
try {
int randomValue = result.orElse(10000);
TimeUnit.MILLISECONDS.sleep(randomValue);
System.out.printf("converted string %s to int %d in %d milliseconds\n",
strInt,
result.orElse(null), randomValue);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
};
}
public static void main(String[] args) {
Instant start = Instant.now();
System.out.println("Starting program: " + start.toString());
List<Supplier<Optional<Integer>>> listOfFunctions = new ArrayList();
for (String arg: args) {
listOfFunctions.add(testMethod(arg));
}
Integer value = listOfFunctions.parallelStream()
.map(function -> function.get())
.filter(optValue -> optValue.isPresent()).map(val-> {
System.out.println("************** VAL: " + val);
return val;
}).findFirst().orElse(null).get();
Instant end = Instant.now();
Long diff = end.toEpochMilli() - start.toEpochMilli();
System.out.println("final value:" + value + ", worked during " + diff + "ms");
}
}
因此,当我使用以下命令执行程序时:
$java ParallelExecution dfafj 34 1341 4656 dfad 245df 5767
我想尽快得到结果“34”(大约在 34 毫秒之后),但事实上,我等待了超过 10 秒。
你能帮忙找到解决这个问题的最有效的方法吗?
解决方案
ExecutorService#invokeAny
看起来是个不错的选择。
List<Callable<Optional<Integer>>> tasks = listOfFunctions
.stream()
.<Callable<Optional<Integer>>>map(f -> f::get)
.collect(Collectors.toList());
ExecutorService service = Executors.newCachedThreadPool();
Optional<Integer> value = service.invokeAny(tasks);
service.shutdown();
我将您的转换List<Supplier<Optional<Integer>>>
为 aList<Callable<Optional<Integer>>>
以便能够将其传递给invokeAny
. Callable
您最初可以构建s。然后,我创建了一个ExecutorService
并提交了任务。
第一个成功执行的任务的结果将在任务返回结果后立即返回。其他任务最终会中断。
您可能还想查看CompletionService
.
List<Callable<Optional<Integer>>> tasks = Arrays
.stream(args)
.<Callable<Optional<Integer>>>map(arg -> () -> testMethod(arg).get())
.collect(Collectors.toList());
final ExecutorService underlyingService = Executors.newCachedThreadPool();
final ExecutorCompletionService<Optional<Integer>> service = new ExecutorCompletionService<>(underlyingService);
tasks.forEach(service::submit);
Optional<Integer> value = service.take().get();
underlyingService.shutdownNow();
推荐阅读
- angular - 如何在视图中最好地订阅 NGRX 状态变化?
- mysql - 如何修复 MySql 工作台中创建模式的“错误解析 DDL”?
- java - 如何在 Validator 类中使用注解?
- apache - 如何使用多个 url 变量?
- intellij-idea - 如何修复错误:RuntimeException Unmatched delimiter:)
- qemu - 在 Gem5 全系统模拟 x86 上使用 Gentoo linux 编译和运行 C 程序
- python - 尝试在循环下打印多个变量
- regex - RegEx 用于验证具有单字母域的电子邮件
- php - 如何连接两个表,然后在 codeigniter 中更新?
- r - 有没有办法将索引列添加到当前数据框?