首页 > 解决方案 > RxJava3。为什么 FlowableSubscriber onNext 没有被调用?

问题描述

我需要获取一个 Subscription 对象才能有机会取消订阅听众。为此,我想给 FlowableSubscriber 一个功能。

代码:

FlowableSubscriber fs = new FlowableSubscriber() {
        @Override
        public void onSubscribe(@NonNull Subscription s) {
            System.out.println("Flowable onSubs");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("Flowable onNext");
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("Flowable onErr");
        }

        @Override
        public void onComplete() {
            System.out.println("Flowable onComlet");
        }
    };

日志是:

Running...
Flowable onSubs

如果我使用 lambdas 它可以工作,但没有 onSubscribe 回调。

我如何获得订阅以及为什么没有调用这些方法?

标签: javarx-java

解决方案


由于支持背压,您必须通过调用您的方法Flowable来实际控制可以消耗的项目数量,以便它们可以通过以下方式发出:requestSubscriptionFlowable

FlowableSubscriber fs = new FlowableSubscriber() {

        private Subscription subscription;

        @Override
        public void onSubscribe(Subscription s) {
            System.out.println("Flowable onSubs");
            subscription=s;
            subscription.request(Integer.MAX_VALUE);
        }

        @Override
        public void onNext(Object o) {
           System.out.println("Flowable onNext");
           //subscription.request(1); you can also request more items from onNext method - it is up to you
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("Flowable onErr");
        }

        @Override
        public void onComplete() {
            System.out.println("Flowable onComlet");
        }
};

在示例Integer.MAX_VALUE中要求订阅,但这可能不是最好的主意。问题是你应该调用Subscription::requestfromonSubscribe来请求初始项目,然后调用它onNext并决定你可以实际处理多少项目。


推荐阅读