首页 > 解决方案 > RxJava2:对成功发送到 REST api 的数据执行操作

问题描述

我有这个特定的问题,我找不到一个干净的解决方案。

Observable.interval(1, TimeUnit.SECONDS)
    .flatMap { constructData() }
    .subscribe { data ->
        api.syncData(data)
            .repeat()
            .subscribe { response ->
                deleteSyncedData(data)
            }
    }

因此,从代码中您可以看到,我需要构建一些数据包以将其发送到后端,并且在正确发送后,我将从本地存储系统中删除它。现在它正在工作,但看起来有点像回调地狱 - 如果我想将响应与数据结合起来然后发出请求怎么办?

有没有人看到这种操作的更好解决方案?

另外我想知道如何在这些操作之间切换调度程序?我想执行constructData()并且deleteSyncedData()onSchedulers.computation()但是api.syncData(data)on Schedulers.io()

标签: kotlinrx-javarx-java2

解决方案


来自SubscribeOn 的 Rx 文档

您不能subscribeOn()chain. 从技术上讲,您可以这样做,但这不会产生任何额外的影响。在您的代码中,如果您链​​接两个不同的,仅Schedulers使用subscribeOn()对源关闭的一个observable将生效,而不是其他任何东西。

但是您可以使用observeOn(). Schedulers您可以在多次使用之间切换observeOn(),然后最后observe打开结果MainThread

例子:

Observable.interval(1, TimeUnit.SECONDS)
    .observeOn(Schedulers.computation())
    .flatMap { constructData() }
    .observeOn(Schedulers.io())
    .flatMap { data -> api.syncData(data).map { data } }
    .observeOn(Schedulers.computation())
    .flatMap { data -> deleteSyncedData(data) }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { data ->

    }

但在我看来,这也很好。

Observable.interval(1, TimeUnit.SECONDS)
    .flatMap { constructData() }
    .subscribe { data ->
        api.syncData(data)
            .repeat()
            .subscribe { response ->
                deleteSyncedData(data)
            }
    }

推荐阅读