首页 > 解决方案 > 等待来自 vertx 中多个可观察对象的响应

问题描述

我正在使用 vertx-rx-java

我有一个处理程序,我需要通过EventBus这两个请求的响应发出 2 个不同的请求并创建响应。

public handle(RoutingContext context) {
....some code...    

    Single<Message<Object>> firstRequest = eb.rxSend("address1", "message1");
    Single<Message<Object>> secondRequest = eb.rxSend("address2", "message2");
    ... TODO ...
}

基本上我需要结合两个请求结果并将它们放入RoutingContext响应中。问题是我不完全理解如何以 rxjava 风格做到这一点。我能想到的唯一方法就是这样:

firstRequest.doOnSuccess(resp1 -> {
  secondRequest.doOnSuccess(resp2 -> {

  });
});

但我认为这是一个不好的方法,因为如果有 10 个请求而不是 2 个呢?此代码将有 10 个嵌套调用。

有没有更好的方法来组合多个请求结果?

标签: javarx-javavert.x

解决方案


zip运营商可用于关联来自多个源的排放,区别在于它仅在其每个基础源排放时才排放。所以...

  • 在有两个基础源的情况下,zip将成对发射。
  • 在存在三个基础源的情况下,zip将以三元组形式发出。
  • ...ETC

要亲身体验我的意思,您可以参考RxMarbles页面,并在观察底部流的同时播放前两个流中的排放。

有了这种理解,您可以使用zip运算符来组合Message回复的结果,如下所示:

Single.zip(firstRequest, secondRequest, (firstReply, secondReply) -> {
    // ...do stuff with the replies and compose some result
    //    to be handled in onSuccess()
    return firstReply.body().toString() + secondReply.body().toString();
})
.subscribe(
    result -> {
        System.out.println("## onSuccess(" + result + ")");
    },
    error -> {
        System.err.println("## onError(" + error.getMessage() + ")");
    }
);

如果任一交付失败,onError则将触发处理程序。onSuccess否则将被触发。

如果,正如你所提到的,你有大量的请求,你想一次处理,有一个zip的重载变体接受一个Iterable来源。在您的情况下,这可能看起来像这样:

final List<Single<Message<Object>>> requests = asList(firstRequest, secondRequest, ...);

Single.zip(requests, replies -> {
    // ...do stuff with the array of replies
    return null;
})
.subscribe(...);

希望有帮助!


推荐阅读