首页 > 解决方案 > 如何在java中以最有效和最优雅的方式使用并行处理

问题描述

我有不同的数据源,我想从这些数据源中并行请求(因为每个请求都是一个 http 调用,并且可能非常耗时)。但我将只使用来自这些请求的 1 个响应。所以我会优先考虑它们。如果第一个响应无效,我将检查第二个响应。如果它也无效,我想使用第三个,等等。但我想停止处理并在收到第一个正确响应后立即返回结果。

为了模拟这个问题,我创建了以下代码,我试图在其中使用 java 并行流。但问题是我只有在处理完所有请求后才会收到最终结果。

public class ParallelExecution {

    private static Supplier<Optional<Integer>> testMethod(String strInt) {
        return () -> {
            Optional<Integer> result = Optional.empty();
            try {
                result = Optional.of(Integer.valueOf(strInt));
                System.out.printf("converted string %s to int %d\n",
                        strInt,
                        result.orElse(null));
            } catch (NumberFormatException ex) {
                System.out.printf("CANNOT CONVERT %s to int\n", strInt);
            }

            try {
                int randomValue = result.orElse(10000);
                TimeUnit.MILLISECONDS.sleep(randomValue);
                System.out.printf("converted string %s to int %d in %d milliseconds\n",
                        strInt,
                        result.orElse(null), randomValue);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        };
    }

    public static void main(String[] args) {
        Instant start = Instant.now();
        System.out.println("Starting program: " + start.toString());
        List<Supplier<Optional<Integer>>> listOfFunctions = new ArrayList();
        for (String arg: args) {
            listOfFunctions.add(testMethod(arg));
        }
        Integer value = listOfFunctions.parallelStream()
                .map(function -> function.get())
                .filter(optValue -> optValue.isPresent()).map(val-> {
                    System.out.println("************** VAL: " + val);
                    return val;
                }).findFirst().orElse(null).get();
        Instant end = Instant.now();
        Long diff = end.toEpochMilli() - start.toEpochMilli();
        System.out.println("final value:" + value + ", worked during " + diff + "ms");
    }
}

因此,当我使用以下命令执行程序时:

$java ParallelExecution dfafj 34 1341 4656 dfad 245df 5767

我想尽快得到结果“34”(大约在 34 毫秒之后),但事实上,我等待了超过 10 秒。

你能帮忙找到解决这个问题的最有效的方法吗?

标签: javajava-8parallel-processingjava-stream

解决方案


ExecutorService#invokeAny看起来是个不错的选择。

List<Callable<Optional<Integer>>> tasks = listOfFunctions
    .stream()
    .<Callable<Optional<Integer>>>map(f -> f::get)
    .collect(Collectors.toList());

ExecutorService service = Executors.newCachedThreadPool();
Optional<Integer> value = service.invokeAny(tasks);

service.shutdown();

我将您的转换List<Supplier<Optional<Integer>>>为 aList<Callable<Optional<Integer>>>以便能够将其传递给invokeAny. Callable您最初可以构建s。然后,我创建了一个ExecutorService并提交了任务。

第一个成功执行的任务的结果将在任务返回结果后立即返回。其他任务最终会中断。

您可能还想查看CompletionService.

List<Callable<Optional<Integer>>> tasks = Arrays
    .stream(args)
    .<Callable<Optional<Integer>>>map(arg -> () -> testMethod(arg).get())
    .collect(Collectors.toList());

final ExecutorService underlyingService = Executors.newCachedThreadPool();
final ExecutorCompletionService<Optional<Integer>> service = new ExecutorCompletionService<>(underlyingService);
tasks.forEach(service::submit);

Optional<Integer> value = service.take().get();
underlyingService.shutdownNow();

推荐阅读