首页 > 解决方案 > 在 doOnNext() 中没有为 Subject 调用 Consumer.accept()

问题描述

我有两个Subject,一个订阅另一个更新。

Subject<Integer> subject = new Subject<>() {
    @Override
    public boolean hasObservers() {
        return false;
    }

    @Override
    public boolean hasThrowable() {
        return false;
    }

    @Override
    public boolean hasComplete() {
        return false;
    }

    @Override
    public Throwable getThrowable() {
        return null;
    }

    @Override
    protected void subscribeActual(Observer<? super InitialAPIResponse> observer) {

    }

    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer result) {
        Log.d(TAG, "onNext: " + apiResponse);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};

subject.doOnNext(result -> Log.d("Subject", "accept: " + result));

observableSubject
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(subject);

observableSubject.onNext(1);
observableSubject.onComplete();

onNext()调用 时,提供的Consumer'不是。即使根据文档accept()doOnNext()

Observable.doOnNext()

修改源ObservableSource,以便在调用 时调用操作onNext

调度程序: doOnNext默认情况下不运行在特定的Scheduler

onNext 当源 ObservableSource 调用时调用的动作onNext

返回应用了副作用行为的源 ObservableSource

根据我从文档中了解到的情况,observable 应该调用Consumerin doOnNext()

我正在学习RxJava,所以也许我在这里做错了什么......

标签: javarx-java2

解决方案


有两个问题:

1.

subject.doOnNext(result -> Log.d("Subject", "accept: " + result));

在上面的代码中,结果doOnNext未被订阅。doOnNext与许多其他运营商一样,它不会自己订阅上游。改成这个,例如:

subject.doOnNext(result -> Log.d("Subject", "accept: " + result)).subscribe();

2.

observableSubject
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(subject);

observableSubject.onNext(1);
observableSubject.onComplete();

在上面的代码中,onComplete.onNext. 这可能会在发出值时导致时序问题。

将上面的代码更改为

observableSubject
            .subscribe(subject); // subscribe on the same thread so that everything happens sequentially. 

observableSubject.onNext(1);
observableSubject.onComplete();

或者

Subject<Integer> observableSubject = BehaviorSubject.create();
observableSubject
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(subject);

observableSubject.onNext(1);
// observableSubject.onComplete(); // don't call onComplete/

推荐阅读