首页 > 解决方案 > 如何在 rxjava 中加入多个可观察对象

问题描述

如何加入多个不同的可观察对象并从视图模型订阅?

我使用的是单一事实源原则,所以首先我从 db 获取数据,然后从 webservice 加载数据,最后将所有数据保存到 db。

为此,我使用了 rxjava、room、dagger2、retrofit 库。但是出现了一些问题。我必须从 web 服务获取多个列表并将每个列表保存到数据库。我尝试了一些解决方案,但此代码多次回复相同的请求。进度条每次都会改变。我怎样才能简化?最佳实践。

api.json

{
"data": {
        "ad": [
            {
                "id": 11,
                "image": "ad/ru/msG0y8vuXl.png"
            }
            ...
        ],
        "categories": [...],
        "status":     [...],
        "location":   [...]
        }
}

HomeRepository.kt

class HomeRepository @Inject constructor(
    private val indexApi: IndexApi, 
    private val categoryDao: CategoryDao, 
    private val userDao: UserDao, 
    private val adDao: AdDao
) {

    fun getCategoryList(): Observable<List<Category>> {
        val categoryListDb: Observable<List<Category>> = categoryDao.getCategoryList()
            .filter { t: List<Category> -> t.isNotEmpty() }
            .subscribeOn(Schedulers.computation())
            .toObservable()

        val categoryListApi: Observable<List<Category>> = indexApi.getIndex()
            .toObservable()
            .map { response ->
                Observable.create { subscriber: ObservableEmitter<Any> ->
                    categoryDao.insertCategoryList(response.data.categories)
                    subscriber.onComplete()
                }
                    .subscribeOn(Schedulers.computation())
                    .subscribe()
                response.data.categories
            }
            .subscribeOn(Schedulers.io())

        return Observable
            .concatArrayEager(categoryListDb, categoryListApi)
            .observeOn(AndroidSchedulers.mainThread())
    }

    fun getUserList(): Observable<List<User>> {
        // same as above
    }

    fun getAdList(): Observable<List<Ad>> {
        // same as above
    }
}

HomeViewmodel.kt

class HomeViewModel @Inject constructor(
    private val homeRepository: HomeRepository
) : BaseViewModel() {

    private val categoryLiveData: MutableLiveData<Resource<List<Category>>> = MutableLiveData()
    private val adLiveData: MutableLiveData<Resource<List<Ad>>> = MutableLiveData()
    private val userLiveData: MutableLiveData<Resource<List<User>>> = MutableLiveData()

    fun categoryResponse(): LiveData<Resource<List<Category>>> = categoryLiveData
    fun adResponse(): LiveData<Resource<List<Ad>>> = adLiveData
    fun userResponse(): LiveData<Resource<List<User>>> = userLiveData

    fun loadCategory() {
        categoryLiveData.postValue(Resource.loading())
        compositeDisposable.add(
            homeRepository.getCategoryList()
                .subscribe({ response ->
                    categoryLiveData.postValue(Resource.succeed(response))
                }, { error ->
                    categoryLiveData.postValue(Resource.error(error))
                })
        )
    }
    fun loadAd() { // Same as above }
    fun loadUser() { // Same as above }
}

HomeFragment.kt

fun init(){
    // ..
    viewmodel.loadCategory()
    viewmodel.adResponse()
    viewmodel.userResponse()

    viewmodel.categoryResponse().observe(this, Observer {
        when(it.status){
        Status.SUCCEED -> { progressBar.toGone() }
        Status.LOADING -> { progressBar.toVisible() }
        Status.FAILED  -> { progressBar.toGone() }
        }
    }
    viewmodel.adResponse().observe(this, Observer { //Same as above }
    viewmodel.userResponse().observe(this, Observer { //Same as above }
}

标签: androidkotlinrx-javarx-java2

解决方案


您应该能够通过包装indexApi.getIndex().toObservable()在可连接的 observable 中来防止代码中发生多次调用。

这是一个更高级的主题,但大致你需要做的是:

在您的班级中创建一个字段HomeRepository

private val observable = Observable.defer {
  indexApi.getIndex().toObservable()
}.replay(1).refCount()

然后,您需要替换每次使用indexApi.getIndex().toObservable()with observable

这可能无法完全达到您预期的结果。这篇博文似乎是对其他可能选项的描述:https ://blog.danlew.net/2016/06/13/multicasting-in-rxjava/


推荐阅读