首页 > 解决方案 > 单带流动?

问题描述

在 rxJava2 Kotlin 中尝试将 Single 与 Flowable 相结合,但没有发生任何事情:不明白什么问题

  Flowable.create<Int>({ emmit ->

            loadNewListener = object :Listener {
                override fun onEmit(id: Int) {
                    emmit.onNext(id)
                }
            }
        }, BackpressureStrategy.LATEST)
                .debounce(500, TimeUnit.MILLISECONDS)
                .flatMapSingle {
                    loadNew(id = it.id)
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ (data:Data) ->

                }, {

                    Timber.e("Failed load data ${it.message}")
                })

我的方法是返回单:

private fun loadNew(id: Int): Single<Data> {

            return when (pdfType) {

                CASE_0 -> {

                    Single.create<Data> { emmit ->

             service.get("data")
                    .enqueue(
                    object : Callback<Void> {
                         override fun onFailure(call: Call<Void>?, t: Throwable?) {
                            // failure
                        }

                          override fun onResponse(call: Call<Void>?, response:  Response<Void>?) {
                 emmit.onSuccess(it.data)
            }
                        }
                    }//single
                }//case_0


                CASE_1 -> 1Repository.loadsome1Rx(id = id).map { it.getData() }

                CASE_2 -> 2Repository.loadsom2LocalRx(id = id).map { it.getData() }

                else -> {
                    throw java.lang.RuntimeException("$this is not available type!")
                }
            }

我的代码有什么问题?需要像这样在 Flowable subscribe() 中单独调用 Maby 吗?

Flowable.create<Int>({ emmit ->
        loadNewListener = object :Listener {
            override fun onEmit(id: Int) {
                emmit.onNext(id)
            }
        }
    }, BackpressureStrategy.LATEST)
            .debounce(500, TimeUnit.MILLISECONDS)

          .subscribe({
              loadNew(id = it.id)

          }, {
              Timber.e("")
          })

这段代码可以工作,但看起来并不像通过组合尝试那样简单。

标签: kotlinrx-java2

解决方案


这个基于您的代码的简单示例正在运行

var i = 0
fun foo() {
    Flowable.create<Int>({ emmit ->
        emmit.onNext(i)
        i++
    }, BackpressureStrategy.LATEST)
            .debounce(500, TimeUnit.MILLISECONDS)
            .flatMapSingle {
                Single.create<String> { emmit ->
                    emmit.onSuccess("onSuccess: $it")
                }
            }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                Log.i("RX", "Subscribe: $it")
            }, {
                it.printStackTrace()
            })
}

检查SingleEmitter.onSuccess()SingleEmitter.onError()在所有情况下调用when (pdfType)...


推荐阅读