首页 > 解决方案 > RxJava Flowable.create(),如何尊重 subscribeOn() 线程

问题描述

我将自定义库 ( dataClient) 回调 api 包装到 RxJava Flowable。使用dataClient它自己的线程,所以它的回调是在它自己的线程上调用的。

在我的 Rx 链中,我尝试使用.subscribeOn(Schedulers.computation()). 不过,当我在我的 Rx 链上打印线程名称时,我得到了我的dataClient线程。

我应该怎么做,让我的 Flowable 使用指定的线程.subscribeOn()

Flowable.create({ emitter ->
    dataClient.setCallback(object : Callback {
        override fun message(message: DataModel) {
            emitter.onNext(vehicle)
        }

        override fun done() {
            emitter.onComplete()
        }
    })
    emitter.setCancellable {
        dataClient.setCallback(null)
    }
}, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation())
    .doOnNext { Log.e("DATA", Thread.currentThread().name) }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { data -> Log.d("DATA", "Got data" + data.id)) }

标签: javaandroidkotlinrx-java2

解决方案


subscribeOn调度程序确保订阅在相关线程上完成。订阅完全是发生的,它的处理方式与observeOn调度程序不同,调度程序在新线程上调度元素的发射。

Flowable.create({ emitter ->
    // this runs with the computation scheduler
    dataClient.setCallback(object : Callback {
        override fun message(message: DataModel) {
            // this runs on the thread it's called from
            emitter.onNext(vehicle)
        }

        override fun done() {
            // this runs on the thread it's called from
            emitter.onComplete()
        }
    })
    emitter.setCancellable {
        dataClient.setCallback(null)
    }
}, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation())
    .doOnNext {
        // this runs on the thread of the onNext call
        Log.e("DATA", Thread.currentThread().name)
    }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
        // this runs on the main thread
        data -> Log.d("DATA", "Got data" + data.id))
    }

由于您的订阅代码没有阻塞并且不维护发射线程,subscribeOn因此不需要设置并且可以省略。它主要只对同步源有效。


推荐阅读