首页 > 解决方案 > 未抛出 RxJava MissingBackpressureException

问题描述

我们在我们的 Android 应用程序中使用了大量的 RxJava1 代码。最近,我们开始得到很多东西MissingBackpressureException。所以我试着更好地理解背压的机制。

我能够得到一个背压异常被抛出

BehaviorSubject<Integer> subject = BehaviorSubject.create();

subject
    .observeOn(Schedulers.computation())
    .subscribe(x -> {
        try {
            logger.info("got " + x);
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
});

for (int i = 0; true; ++i) {
    logger.info("sending " + i);
    subject.onNext(i);
}

太好了,我得到了 a MissingBackpressureException,但是当我导致subscribe动作永远不会返回时,我不再得到MissingBackpressureException,所以这段代码:

subject
    .observeOn(Schedulers.computation())
    .subscribe(x -> {
        while(true) {
            try {
                logger.info("got " + x);
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
});

所以我在这里有几个问题:

  1. 为什么我没有MissingBackpressureException加入第二个订阅者?
  2. 所有对象会发生什么?我没有看到内存占用越来越大,所以我假设它们被扔掉了?为什么?
  3. 当我尝试RxJava2通过添加toFlowable(BackpressureStrategy.ERROR)到订阅者来做到这一点时,在任何一种情况下我都没有遇到异常,这里发生了什么?

        subject
            .observeOn(Schedulers.computation())
            .toFlowable(BackpressureStrategy.ERROR)
            .subscribe(x -> {
    

标签: androidrx-javarx-java2

解决方案


推荐阅读