首页 > 解决方案 > 以递归方式使用 java 8 CompletableFuture

问题描述

我需要从数据库加载记录,然后在链中对它们执行一些操作,我想重构我们现有的代码以利用 CompletableFuture API

基本思想是

  1. a. 将记录加载到队列中 b. 对于队列中的每个记录,请执行以下操作:
    1. 执行任务 A(需要一段时间)
    2. 基于任务 A 执行任务 B(需要更多时间)
    3. 执行任务 C(也需要时间)

一些记录当然比其他记录结束得更快,所以我希望一旦完成,另一个(来自队列)将开始执行

我有以下代码

gamesQueue = new PriorityQueue<Game>();
completable = new LinkedList<CompletableFuture<String>>();
Instant start = Instant.now();
Executor pool = Executors.newCachedThreadPool();
DB.getInstance().getConn().useHandle(handle -> {
   handle.attach(GameService.class).list().stream().forEach(game -> gamesQueue.add(game));
});

这将加载我的记录然后我执行以下操作

while (!gamesQueue.isEmpty()) {

        CompletableFuture<String> step1 = CompletableFuture.supplyAsync(this::getCurrentGame, pool)
                .thenApply(CapitalLetterTask::capitalize).thenApply(game-> {
                    if(game.isEndedExceptionaly()) {
                        handleNumber5Exception(game);
                        throw new CompletionException(new NumberFiveException(game, "got number 5"));
                    }
                    return game;
                })
                .thenApply(AddStarTask::addStarToGame).exceptionally(ExceptionHandler::handleWeirdEx);
        completable.add(step1);

    }


    while (!completable.isEmpty()) {
        CompletableFuture<String> completed = completable.poll();
        if (completed.isDone()) {
            try {
                log.debug("result: {}", completed.get());
            } catch (Exception ex) {

            }

        } else if(completed.isCompletedExceptionally()) {
            try{ //never enters here
                log.debug("This is a bad result:{} ", completed.get());
            }catch(Exception ex) {

            }

        }
         else {
            completable.add(completed);
        }
    }
    Instant end = Instant.now();
    Duration duration = Duration.between(start, end);
    log.debug("Total time: {}", duration.getSeconds());
    loadGames();  <-- call itself
}

我不满足于我的异常处理此外我看到即使是应该在第一阶段被捕获的期货仍在进一步传递

我想问的是如何以这样的方式实现它,即每个方法都可以抛出自己的自定义异常,被捕获和处理,而无需将其进一步向下传递

标签: javajava-8functional-programmingcompletable-future

解决方案


推荐阅读