future - Vertx 中任意数量调用的顺序组合与 Futures
问题描述
我们在 vertx 中使用 Futures,例如:
Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);
fetchVehicle.compose(vehicleJson -> vehicleDoor(routingContext, client, vehicleJson, lock)).setHandler(
asyncResult -> {
if (asyncResult.succeeded()) {
LOG.info("Door operation succeeded with result {}", asyncResult.result().encode());
handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
}
else {
handler.handle(Future.failedFuture(asyncResult.cause()));
}
});
例如,我们处理 2 个呼叫。
或者我有另一个片段可以处理任意数量的方法:
List<Future> futures = new ArrayList<>();
conversation.getRequestList().forEach(req -> {
Future<Message<Object>> senderFuture = Future.future();
vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, JsonObject.mapFrom(req), deliveryOptions, senderFuture.completer());
// sent successfully. save the replyAddress and the conversation for later/callback
log.info("Saving the conversation for the request.", conversation.getReplyAddress());
pendingCommands.put(req.getBody().getString(MSG_ID), conversation);
futures.add(senderFuture);
});
CompositeFuture.all(futures).setHandler(ar -> {
if (ar.succeeded()) {
handler.handle(Future.succeededFuture());
} else {
log.error("forwardToVWClient VW got result : {}", ar.cause());
handler.handle(Future.failedFuture(ar.cause()));
}
});
在这里,我们conversation.getRequestList()
在事先不知道它们的数量的情况下将所有请求链接起来。
但是方法的缺点.all()
是,我们无法控制顺序。
如何使用 Vertx Futures 链接任意数量的方法(不知道调用的确切数量)?
编辑:
官方指南讨论了顺序组合,但给出的示例有 3 个调用。它没有解释如何为任意数量的调用执行此操作。
请参阅http://vertx.io/docs/vertx-core/java/中的“顺序组合”
我希望很清楚。
解决方案
这是一个解决方案map & reduce
,它以有序的方式执行方法并以 a 的形式返回累积的结果Future<String>
public static <T> Future<String> chainCall(List<T> list, Function<T, Future<String>> method){
return list.stream().reduce(Future.succeededFuture(),// the initial "future"
(acc, item) -> acc.compose(v -> method.apply(item)), // we return the compose of the previous "future" with "future" returned by next item processing
(a,b) -> Future.future()); // not used! only useful for parallel stream.
}
可以在下面的例子中使用:
chainCall(conversation.getRequestList(), this::sendApiRequestViaBus);
哪里sendApiRequestViaBus
是:
/**
* @param request The request to process
* @return The result of the request processing.
*/
Future<String> sendApiRequestViaBus(ApiRequest request) {
Future<String> future = Future.future();
String address = CommandUtilsFactory.getInstance(request.getImplementation()).getApiClientAddress();
log.debug("Chain call start msgId {}", request.getId());
vertx.eventBus().send(address, JsonObject.mapFrom(request), deliveryOptions, res -> {
log.debug("Chain call returns {}", request.getId());
if (res.succeeded()) {
future.complete("OK");
} else {
future.fail("KO");
}
});
return future;
}
我希望它有所帮助。
推荐阅读
- deep-learning - 使用暗网 yolov4 训练数据集时,平均损失为 0
- sql - 如何使用变量值编写内联运算符
- javascript - axios获取/发布请求时加载栏
- javascript - 如何在 vuex 状态上设置间隔并从模板中清除它?
- ios - 为什么我的 Text 视图与顶部对齐,但我的 ScrollView 与中心对齐?
- reactjs - 如何获取参数数据?
- flutter - 在 Flutter 中滚动页面时无法保存值
- c# - log4net中的时间戳,是生成日志条目的时间还是写入文件的时间?
- macos - mac终端上字符串的UTF8编码
- python - 创建两个数字之间的列表列表 --- Python