rx-java - 正确使用 RxJava 移除 Vert.x 回调
问题描述
我们将Vert.x与RxJava一起使用。我们的应用程序逻辑正在运行,然而,尽管使用了 RxJava,代码仍然包含嵌套了好几层的回调。下面是一个例子。为了突出代码的结构,所有日志记录和其他逻辑都已删除。
EventBus eb = vertx.eventBus();
// retrieve incoming messages from the consumer by listening on the event bus
MessageConsumer<BusMessage> consumer = eb.<BusMessage>consumer(address);
Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
consumerObservable.subscribe(cosumedMsg -> {
Object msg = cosumedMsg.body().encode());
Single<Message<MyObject>> step1 = eb.<MyObject>rxSend(addrs1, msg, deliveryOpts));
step1.subscribe(validationRes -> {
Single<Message<MyObject>> step2 = eb.rxSend(addrs2, msg, deliveryOpts));
step2.subscribe(handlerRes -> {
Single<Message<MyObject>> step3 = eb.rxSend(addrs3, msg, deliveryOpts));
step3.subscribe(dsRes -> {
Single<Message<MyObject>> step4 = eb.rxSend(addrs4, msg, deliveryOpts));
step4.subscribe(msg -> {
msg.body();
}, err -> {
// step1 error handling
});
}, err -> {
// step2 error handling
});
}, err -> {
// step3 error handling
});
}, err -> {
// step4 error handling
});
很可能我没有正确使用 RxJava。有没有办法使用 RaxJava 的反应特性来重构示例代码以避免嵌套回调?
谢谢
解决方案
在我看来,最后你的 Rx 链中应该只有一个subscribe()
。您应该使用链来链接您的事件总线调用flatMap()
。像这样的东西:
public void start() throws Exception {
EventBus eventBus = vertx.eventBus();
MessageConsumer<BusMessage> consumer = eventBus.consumer("my.address");
Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
consumerObservable
.flatMapSingle(consumerMsg -> {
final Object msg = consumerMsg.body().encode();
return eventBus.rxSend("addrs1", msg, new DeliveryOptions());
})
.flatMapSingle(step1Msg -> eventBus.rxSend("addrs2", step1Msg, new DeliveryOptions()))
.flatMapSingle(step2Msg -> eventBus.rxSend("addrs3", step2Msg, new DeliveryOptions()))
.subscribe(this::handleFinalMsg,
this::handleException,
this::handleOnComplete);
}
private void handleFinalMsg(final Message<Object> message) {
System.out.println(message.body());
}
private void handleException(final Throwable throwable) {
System.err.println(throwable);
}
private void handleOnComplete() {
System.out.println("it's the end!");
}
任何可能发生的异常都由this::handleException
.
您可以查看此博客文章:https ://streamdata.io/blog/vert-x-and-the-async-calls-chain/ (及其相关代码),并且可能位于此代码https:// github.com/ctranxuan/howtos-vertx/tree/master/eventbus-rxjava以获得可能的见解。
我希望这有帮助。
推荐阅读
- html - 在移动设备中应用 css 属性 hover、ontouch
- javascript - HTML 数据集为空
- c# - 如何在 C# .dll 中包含 .config 文件,该文件将在 vba 项目中用于 ms 访问?
- c# - 使用 XML Reader 在 C#.net 中查找和替换节点内部文本
- windows - 上报Mac地址的更改顺序
- angular - 附加到组件时如何调整 Angular CDK 覆盖背景的大小
- python - 用于数据管道的 Python Pandas 合并、融化、pivot_table
- linux - 从源代码构建某些 GNU 软件时,缺少手册页安装。我怎样才能得到它们?
- php - 基于子数组元素的 PHP usort 数组无法正常工作 - 有问题吗?
- php - 无法在循环内的 PHP 中显示标题