java - 等待来自 vertx 中多个可观察对象的响应
问题描述
我正在使用 vertx-rx-java
我有一个处理程序,我需要通过EventBus
这两个请求的响应发出 2 个不同的请求并创建响应。
public handle(RoutingContext context) {
....some code...
Single<Message<Object>> firstRequest = eb.rxSend("address1", "message1");
Single<Message<Object>> secondRequest = eb.rxSend("address2", "message2");
... TODO ...
}
基本上我需要结合两个请求结果并将它们放入RoutingContext
响应中。问题是我不完全理解如何以 rxjava 风格做到这一点。我能想到的唯一方法就是这样:
firstRequest.doOnSuccess(resp1 -> {
secondRequest.doOnSuccess(resp2 -> {
});
});
但我认为这是一个不好的方法,因为如果有 10 个请求而不是 2 个呢?此代码将有 10 个嵌套调用。
有没有更好的方法来组合多个请求结果?
解决方案
zip
运营商可用于关联来自多个源的排放,区别在于它仅在其每个基础源排放时才排放。所以...
- 在有两个基础源的情况下,
zip
将成对发射。 - 在存在三个基础源的情况下,
zip
将以三元组形式发出。 - ...ETC
要亲身体验我的意思,您可以参考RxMarbles页面,并在观察底部流的同时播放前两个流中的排放。
有了这种理解,您可以使用zip运算符来组合Message
回复的结果,如下所示:
Single.zip(firstRequest, secondRequest, (firstReply, secondReply) -> {
// ...do stuff with the replies and compose some result
// to be handled in onSuccess()
return firstReply.body().toString() + secondReply.body().toString();
})
.subscribe(
result -> {
System.out.println("## onSuccess(" + result + ")");
},
error -> {
System.err.println("## onError(" + error.getMessage() + ")");
}
);
如果任一交付失败,onError
则将触发处理程序。onSuccess
否则将被触发。
如果,正如你所提到的,你有大量的请求,你想一次处理,有一个zip的重载变体接受一个Iterable
来源。在您的情况下,这可能看起来像这样:
final List<Single<Message<Object>>> requests = asList(firstRequest, secondRequest, ...);
Single.zip(requests, replies -> {
// ...do stuff with the array of replies
return null;
})
.subscribe(...);
希望有帮助!
推荐阅读
- hawtio - 如何使用 hawtio 编辑/保存 Camel 源 XML?
- apache-spark - spark.sql.cbo.enabled=true 与 Hive 表
- go - Go中是否有等效的静态库?
- ruby-on-rails - Rails:turbolinks:在正文内容更改之前触发加载事件
- c - feof 过早地变为真
- sql - 如果 Column 的值不是 Oracle SQL 中提供的值,则获取该行
- spring-boot - Springboot 应用程序在尝试从 keycloak 访问令牌中获取角色时抛出异常
- java - 如何打印出类中每个对象的所有数据?
- javascript - Kendo DateTimePicker 从本地存储读取后没有时钟
- excel - VBA Excel:循环