首页 > 解决方案 > 正确使用 RxJava 移除 Vert.x 回调

问题描述

我们将Vert.xRxJava一起使用。我们的应用程序逻辑正在运行,然而,尽管使用了 RxJava,代码仍然包含嵌套了好几层的回调。下面是一个例子。为了突出代码的结构,所有日志记录和其他逻辑都已删除。

EventBus eb = vertx.eventBus();
// retrieve incoming messages from the consumer by listening on the event bus
MessageConsumer<BusMessage> consumer = eb.<BusMessage>consumer(address);
Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
consumerObservable.subscribe(cosumedMsg -> {
  Object msg = cosumedMsg.body().encode());
  Single<Message<MyObject>> step1 = eb.<MyObject>rxSend(addrs1, msg, deliveryOpts));
  step1.subscribe(validationRes -> {
    Single<Message<MyObject>> step2 = eb.rxSend(addrs2, msg, deliveryOpts));
    step2.subscribe(handlerRes -> {
      Single<Message<MyObject>> step3 = eb.rxSend(addrs3, msg, deliveryOpts));
        step3.subscribe(dsRes -> {
          Single<Message<MyObject>> step4 = eb.rxSend(addrs4, msg, deliveryOpts));
            step4.subscribe(msg -> {
              msg.body();
            }, err -> {
            // step1 error handling
          });
        }, err -> {
        // step2 error handling
      });
    }, err -> {
    //  step3 error handling
  });
 }, err -> {
 // step4 error handling
});

很可能我没有正确使用 RxJava。有没有办法使用 RaxJava 的反应特性来重构示例代码以避免嵌套回调?

谢谢

标签: rx-javarx-java2vert.x

解决方案


在我看来,最后你的 Rx 链中应该只有一个subscribe()。您应该使用链来链接您的事件总线调用flatMap()。像这样的东西:

    public void start() throws Exception {
        EventBus eventBus = vertx.eventBus();
        MessageConsumer<BusMessage> consumer = eventBus.consumer("my.address");
        Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
        consumerObservable
                .flatMapSingle(consumerMsg -> {
                    final Object msg = consumerMsg.body().encode();
                    return eventBus.rxSend("addrs1", msg, new DeliveryOptions());
                })
                .flatMapSingle(step1Msg -> eventBus.rxSend("addrs2", step1Msg, new DeliveryOptions()))
                .flatMapSingle(step2Msg -> eventBus.rxSend("addrs3", step2Msg, new DeliveryOptions()))
                .subscribe(this::handleFinalMsg,
                           this::handleException,
                           this::handleOnComplete);
    }

    private void handleFinalMsg(final Message<Object> message) {
        System.out.println(message.body());
    }

    private void handleException(final Throwable throwable) {
        System.err.println(throwable);
    }

    private void handleOnComplete() {
        System.out.println("it's the end!");
    }

任何可能发生的异常都由this::handleException.

您可以查看此博客文章:https ://streamdata.io/blog/vert-x-and-the-async-calls-chain/ (及其相关代码),并且可能位于此代码https:// github.com/ctranxuan/howtos-vertx/tree/master/eventbus-rxjava以获得可能的见解。

我希望这有帮助。


推荐阅读