首页 > 解决方案 > CompletableFuture 超时

问题描述

我最近才开始使用 CompletableFuture,但我遇到了一个问题,我有 N 个待办事项。

每个请求应发送到 2 个不同的端点,并应比较其结果作为 JSON。由于我有大量的待办请求,而且我不知道每个请求需要多长时间,我想限制等待结果的时间,例如 3 秒左右。

所以我写了这个测试代码:

public class MainTest {

   private static final Logger logger = LoggerFactory.getLogger(MainTest.class);
   private Instant start;

   public static void main(String[] args) {

       MainTest main = new MainTest();
       main.start();
   }

   public void start(){
       String req1 = "http://localhost:8080/testing";
       String req2 = "http://127.0.0.1:8095/testing2";

       ExecutorService exec = Executors.newCachedThreadPool();

       start = Instant.now();
       CompletableFuture<String> comp1 = CompletableFuture.supplyAsync(() -> doReq(req1), exec);
       CompletableFuture<String> comp2 = CompletableFuture.supplyAsync(() -> doReq(req2), exec);


       List<CompletableFuture<String>> completables = List.of(comp1,comp2);

       logger.info("Waiting completables");

       CompletableFuture<List<String>> a = allOf(completables);


       List<String> r = new ArrayList<>();
       try {
           r = a.get(3, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
           e.printStackTrace();
       } catch (ExecutionException e) {
           e.printStackTrace();
       } catch (TimeoutException e) {
           e.printStackTrace();
       }finally {
           Instant end = Instant.now();
           logger.info(" Took: " + DurationFormatUtils.formatDurationHMS(Duration.between(start, end).toMillis()));

           System.out.println(r.size());
           r.forEach(System.out::println);
       }
       exec.shutdown();
   }

   public String doReq(String request){
       AtomicReference<String> response = new AtomicReference<>("default");
       try{
           logger.info("Sending request: {}", request);
           Unirest.get(request).asJson()
                   .ifSuccess(r -> {
                       response.set(r.getBody().toString());
                   })
                   .ifFailure(r -> {
                       logger.error("Oh No! Status" + r.getStatus());
                       r.getParsingError().ifPresent(e -> {
                           logger.error("Parsing Exception: ", e);
                           logger.error("Original body: " + e.getOriginalBody());
                       });
                   });
       } catch (Exception e) {
           logger.error("Error on request! {}", e.getMessage());

       }
      return response.get();
   }


   public <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
       CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
       return allFuturesResult.thenApply(v ->
                       futuresList.stream().
                       map(CompletableFuture::join).
                       collect(Collectors.<T>toList())
       );
   }
}

当任何一个请求的时间超过 3 秒时,问题就来了……我想得到那些有时间得到它的结果……我故意在我的网络上延迟了我的 7 个请求秒,我得到以下输出:其中一个有时间,但其结果不在列表中......

2020-12-09T17:05:03,878 [pool-2-thread-2] INFO (MainTest:85) - Sending request: http://127.0.0.1:8095/testing2
2020-12-09T17:05:03,878 [pool-2-thread-1] INFO (MainTest:85) - Sending request: http://localhost:8080/testing
2020-12-09T17:05:03,878 [main] INFO (MainTest:53) - Waiting completables
java.util.concurrent.TimeoutException
    at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
    at me.testing.MainTest.start(MainTest.java:60)
    at me.testing.MainTest.main(MainTest.java:31)
2020-12-09T17:05:06,889 [main] INFO (MainTest:69) -  Took: 00:00:03.009
0

标签: javaconcurrencytaskcompletable-futureunirest-java

解决方案


如果超时,您应该从已经完成的值中获取值。

可以是这样的:

public <T> List<T> getAllCompleted(List<CompletableFuture<T>> futuresList, long timeout, TimeUnit unit) {
  CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
  try {
    allFuturesResult.get(timeout, unit);
  } catch (Exception e) {
    // you may log it
  }
  return futuresList
    .stream()
    .filter(future -> future.isDone() && !future.isCompletedExceptionally()) // keep only the ones completed
    .map(CompletableFuture::join) // get the value from the completed future
    .collect(Collectors.<T>toList()); // collect as a list
}

这是一个完整的工作示例,我只是将 doReq 替换为 sleep,因为我没有您的 Web 服务:

public class MainTest {

    private Instant start;

    public static void main(String[] args) {

        MainTest main = new MainTest();
        main.start();
    }

    public void start() {
        String req1 = "http://localhost:8080/testing";
        String req2 = "http://127.0.0.1:8095/testing2";

        ExecutorService exec = Executors.newCachedThreadPool();

        start = Instant.now();
        CompletableFuture<String> comp1 = CompletableFuture.supplyAsync(() -> doReq(req1), exec);
        CompletableFuture<String> comp2 = CompletableFuture.supplyAsync(() -> doReq(req2), exec);

        List<CompletableFuture<String>> completables = List.of(comp1, comp2);

        System.out.println("Waiting completables");

        List<String> r = getAllCompleted(completables, 3, TimeUnit.SECONDS);
        Instant end = Instant.now();
        System.out.println(" Took: " + DurationFormatUtils.formatDurationHMS(Duration.between(start, end).toMillis()));

        System.out.println(r.size());
        r.forEach(System.out::println);
        exec.shutdown();
    }

    public String doReq(String request) {
        if (request.contains("localhost")) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "response1";
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "response2";
    }

    public <T> List<T> getAllCompleted(List<CompletableFuture<T>> futuresList, long timeout, TimeUnit unit) {
        CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
        try {
            allFuturesResult.get(timeout, unit);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return futuresList.stream()
            .filter(future -> future.isDone() && !future.isCompletedExceptionally()) // keep only the ones completed
            .map(CompletableFuture::join) // get the value from the completed future
            .collect(Collectors.<T>toList()); // collect as a list
    }
}

推荐阅读