首页 > 解决方案 > 将 flatMap 与 Completable 一起使用

问题描述

我正在尝试多次调用 API,并传入不同的参数。当没有更多数据返回时,rx 流应该终止。每次调用后,数据都会存储在我的本地存储库中。这是我所拥有的:

val startPositions = BehaviorSubject.createDefault(0)

startPositions.flatMap { startPos -> App.context.repository.getConnections(startPos) }
        .flatMap { connections -> App.context.repository.storeConnections(connections) }
        .doOnNext { startPos -> startPositions.onNext(startPos + 1) }
        .subscribe({ startPos -> println("Index $startPos") })

这是处理下载数据的api方法:

 override fun getConnections(startPos: Int): Observable<List<Connection>> {
            return myAPI.getConnections(startPos)
        }

这是存储数据的api方法:

override fun storeConnections(connections: List<Connection>): Completable =
        Completable.fromAction {
            appDao.storeConnections(connections.map {
                mapper.toDb(it)
            })
        }

我得到的编译错误是:

类型不匹配:推断类型是 (List) -> Completable 但是 ((List) -> ObservableSource!)!预计

如果可能,我不想更改我的 api 调用的返回类型。我也不确定是否使用 flatMap。下载返回的数据是一个列表,我希望该列表在流中保留为列表。我不想发出单个列表项。

标签: kotlinrx-java

解决方案


为了修复类型不匹配错误,请使用 operatorflatMapCompletable而不是flatMap

    .flatMapCompletable { connections -> App.context.repository.storeConnections(connections) }

然后你将不得不使用doOnComplete而不是doOnNext

    .doOnComplete { startPos -> startPositions.onNext(startPos + 1) }

推荐阅读