首页 > 解决方案 > RxJava 2.x - 由 PublishSubject 触发并与其他 Observable 合并的 Observable 获取 flatMap 不会被订阅/执行

问题描述

我有一个分页解决方案,使用PublishSubject如下所示:

    private val pages: PublishSubject<Int> = PublishSubject.create()
    val observable: Observable<List<Data> = pages.hide()
        .filter { !inFlight }
        .doOnNext { inFlight = true }
        .flatMap{
            getPage(it) // Returns an Observable
        }
        .doOnNext(::onNextPage) // inFlight gets reset here

Observable是与其他的合并和扫描的,Observable如下所示:

    fun stateObservable(): Observable<SavedState> {
        return Observable.merge(listOf(firstPage(),
            nextPage(),// The observable listed above
            refresh()))
            .scan(MyState.initialState(), StateReducer::reduce)
    }

基本上我有一个单向设置,在累加器功能的帮助下,每个可观察到的更新MyState及其相关更改reduce

ViewModel这种情况下,这是以直接的方式消耗的:

        interactor.stateObservable()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeBy(onNext = ::render, onError = Timber::e)
            .addTo(subscriptions)

此设置适用于firstPage以及refresh(也在 的帮助下触发PublishSubject),但由于某种原因,分页解决方案得到了作为返回的getPage ObservableflatMap但是这个页面Observable永远不会被触发/订阅,并且doOnNext之后flatMap显然没有得到称为。似乎它基本上不想订阅它,我根本不知道为什么。

getPage函数如下所示:

    private fun getPage(page: Long): Observable<PartialState<SavedState>> {
        return repo.getPage(page).firstOrError().toObservable()
            .subscribeOn(Schedulers.io())
            .map<PartialState<MyState>> { NextPageLoaded(it) }
            .onErrorReturn { NextPageError(it) }
            .startWith { NextPageLoading() }
    }

在repo中,通过以下方式getPage将 RxJava 1 转换为 RxJava2 ObservableObservableRxJavaInterop

    public io.reactivex.Observable<List<Data>> getPage(long page) {
        Observable<List<Data>> observable = getPage(page)
                .map(dataList -> {
                    if(dataList == null){
                        dataList = new ArrayList<>();
                    }
                    return dataList;
                });

        return RxJavaInterop.toV2Observable(observable);
    }

我没有收到任何错误,所以你可以排除它。

我已经有了与 RxJava 1 相同的设置,它运行良好,现在当我迁移到 2.x 时,我期待相同的解决方案能够工作,但我完全陷入了这个分页问题和所有其他场景设置按预期工作。

为了能够测试该问题,我在GitHub 上上传了一个示例项目来演示该问题。

有任何 RxJava 专家知道它可能是什么吗?:)

谢谢

标签: androidrx-java2

解决方案


我发现了一个问题{}startWithnextPageObservable.

.startWith { NextPageLoading() }

反而:

.startWith ( NextPageLoading() )

但是,通过此更改,您的代码会由于其他地方的 null 值而产生 IAE:

java.lang.IllegalArgumentException: Parameter specified as non-null is null: method kotlin.jvm.internal.Intrinsics.checkParameterIsNotNull, parameter page
    at com.paginationissue.ui.list.PaginationIssueInteractor$1.invoke(Unknown Source:2)
    at com.paginationissue.ui.list.PaginationIssueInteractor$1.invoke(PaginationIssueInteractor.kt:15)
    at com.paginationissue.paging.PageNumberTokenStrategy.generateNextPageToken(PageNumberTokenStrategy.kt:21)
    at com.paginationissue.paging.PageNumberTokenStrategy.generateNextPageToken(PageNumberTokenStrategy.kt:12)
    at com.paginationissue.paging.Pager.onNextPage(Pager.kt:92)

推荐阅读