首页 > 解决方案 > 为什么 Flowable.subscribe(Subscriber) 不返回 Disposable?

问题描述

大多数Flowable.subscribe()重载都返回 a Disposable,这使得流能够被清理。我习惯这样做:

Disposable d = Flowable.just()
    .map(...)
    .subscribe(
        n -> ...
        t -> ...
        () -> ...
    );

// someone clicks "cancel" in another thread
d.dispose();

但是,使用时.subscribe(Subscriber)不会Disposable返回。我想使用.subscribe(Subscriber),所以我可以传入 aTestSubscriber来验证行为。那么在这种情况下我将如何处理流程呢?

我在 Javadoc 中搜索了合适Subscriber的 s。有DisposableSubscriber哪个看起来可行,但有两个问题:

  1. 类描述如下,这表明cancel()不能从流外部使用:

使用受保护的 request(long) 请求更多项目,并使用cancel() 从 onNext 实现中取消序列

  1. TestSubscriber 不扩展 DisposableSubscriber。

标签: javarx-javareactive-programmingrx-java3

解决方案


您可以使用Flowable.subscribeWith(Subscriber)代替subscribe,以便Subscriber返回您的,而不是void

在 RxJava 3.x中TestSubscriber不再实现Disposable. 它确实实现了它所扩展的disposeisDisposed方法,由 定义。BaseTestConsumer但是,这两种方法都已经做了protected,所以你实际上不能直接使用它们。幸运的是,有TestSubscriber.cancel()/ TestSubscriber.isCancelled(),它是公共的,并且等同于dispose()/ isDisposed(),所以你可以使用它们来代替。

至于Flowable.subscribe不返回 a的原因Disposable,这是在 RxJava 2 中进行的,以遵守 Reactive-Streams 规范

由于 Reactive-Streams 规范,Publisher.subscribe返回void...为了解决这个问题,该方法E subscribeWith(E subscriber)已添加到每个基本反应类中,该类按原样返回其输入订阅者/观察者。


推荐阅读