首页 > 解决方案 > Vertx 中任意数量调用的顺序组合与 Futures

问题描述

我们在 vertx 中使用 Futures,例如:

Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);

        fetchVehicle.compose(vehicleJson -> vehicleDoor(routingContext, client, vehicleJson, lock)).setHandler(
                asyncResult -> {
                    if (asyncResult.succeeded()) {
                    LOG.info("Door operation succeeded with result {}", asyncResult.result().encode());
                    handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
                }
                else {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                }
        });

例如,我们处理 2 个呼叫。

或者我有另一个片段可以处理任意数量的方法:

List<Future> futures = new ArrayList<>();
        conversation.getRequestList().forEach(req -> {
            Future<Message<Object>> senderFuture = Future.future();
            vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, JsonObject.mapFrom(req), deliveryOptions, senderFuture.completer());

            // sent successfully. save the replyAddress and the conversation for later/callback
            log.info("Saving the conversation for the request.", conversation.getReplyAddress());
            pendingCommands.put(req.getBody().getString(MSG_ID), conversation);

            futures.add(senderFuture);
        });

        CompositeFuture.all(futures).setHandler(ar -> {
            if (ar.succeeded()) {
                handler.handle(Future.succeededFuture());
            } else {
                log.error("forwardToVWClient VW got result : {}", ar.cause());
                handler.handle(Future.failedFuture(ar.cause()));
            }
        });

在这里,我们conversation.getRequestList()在事先不知道它们的数量的情况下将所有请求链接起来。

但是方法的缺点.all()是,我们无法控制顺序。

如何使用 Vertx Futures 链接任意数量的方法(不知道调用的确切数量)?

编辑:

官方指南讨论了顺序组合,但给出的示例有 3 个调用。它没有解释如何为任意数量的调用执行此操作。

请参阅http://vertx.io/docs/vertx-core/java/中的“顺序组合”

我希望很清楚。

标签: futurevert.x

解决方案


这是一个解决方案map & reduce,它以有序的方式执行方法并以 a 的形式返回累积的结果Future<String>

 public static <T> Future<String> chainCall(List<T> list, Function<T, Future<String>> method){
        return list.stream().reduce(Future.succeededFuture(),// the initial "future"
                (acc, item) -> acc.compose(v -> method.apply(item)), // we return the compose of the previous "future" with "future" returned by next item processing
                (a,b) -> Future.future()); // not used! only useful for parallel stream.
    }

可以在下面的例子中使用:

 chainCall(conversation.getRequestList(), this::sendApiRequestViaBus);

哪里sendApiRequestViaBus是:

/**
     * @param request The request to process
     * @return The result of the request processing. 
     */
    Future<String> sendApiRequestViaBus(ApiRequest request) {
        Future<String> future = Future.future();
        String address = CommandUtilsFactory.getInstance(request.getImplementation()).getApiClientAddress();
        log.debug("Chain call start msgId {}", request.getId());

        vertx.eventBus().send(address, JsonObject.mapFrom(request), deliveryOptions, res -> {
            log.debug("Chain call returns {}", request.getId());
            if (res.succeeded()) {
                future.complete("OK");
            } else {
                future.fail("KO");
            }
        });
        return future;
    }

我希望它有所帮助。


推荐阅读