首页 > 解决方案 > 如何使用 RxJava2 基于条件执行并行进程

问题描述

我正在尝试在我当前的 Android 项目中使用 RxJava2。

   implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
   implementation 'io.reactivex.rxjava2:rxjava:2.1.12'

我有许多遵循这种模式的用例。

1). Check a condition is true or false.
2). When condition is true
2.1). Execute multiple processes in parallel
2.2). wait until all parallel process have completed.
3). When condition is false.
3.1). Do nothing.

一个这样的用例是我检查我的本地数据库是否需要从远程源刷新。

我已经走了这么远,现在我被困住了......

 refreshRequired().subscribeOn(Schedulers.io()).subscribe(getObserver());

有问题的方法类似于:-

/**
 *
 * @return
 */
private Single<Boolean> refreshRequired() {
    return Single.create(new SingleOnSubscribe<Boolean>() {
        @Override
        public void subscribe(final SingleEmitter<Boolean> emitter) {
            if (emitter.isDisposed()) {

            } else {
                emitter.onSuccess(isRefreshRequired());
            }
        }
    });

}

/**
 *
 * @return
 */
private SingleObserver<Boolean> getObserver() {
    return new SingleObserver<Boolean>() {
        @Override
        public void onSubscribe(final Disposable disposable) {
        }

        @Override
        public void onSuccess(final Boolean value) {
            Log.d(TAG, "onSuccess() called with: value = [" + value + "]");

        }

        @Override
        public void onError(final Throwable throwable) {
            Log.e(TAG, "onError() called with: ", throwable);
        }
    };
}

我无法理解的是当public void onSuccess(final Boolean value) {}方法接收到值时如何触发我的并行进程true

这个过程的伪代码是:-

When refreshRequired
  Refresh Data A
  Refresh Data B
  Refresh Data C
  Stop
Otherwise
  Stop

标签: androidparallel-processingrx-java2

解决方案


您可以通过使用filter运算符将​​反应流分成两部分来实现:

Observable<Boolean> refreshRequiredObservable = refreshRequired();

refreshRequiredObservable.filter(refreshRequired -> refreshRequired)
        .doOnNext(aBoolean -> System.out.println("Do first thing"))
        .doOnNext(aBoolean -> System.out.println("Do second thing"))
        .doOnNext(aBoolean -> System.out.println("Do third thing"))
        .subscribe();

refreshRequiredObservable.filter(refreshRequired -> !refreshRequired)
        .doOnNext(aBoolean -> System.out.println("Do nothing"))
        .subscribe();

推荐阅读