java - RxJava Flowable.create(),如何尊重 subscribeOn() 线程
问题描述
我将自定义库 ( dataClient
) 回调 api 包装到 RxJava Flowable。使用dataClient
它自己的线程,所以它的回调是在它自己的线程上调用的。
在我的 Rx 链中,我尝试使用.subscribeOn(Schedulers.computation())
. 不过,当我在我的 Rx 链上打印线程名称时,我得到了我的dataClient
线程。
我应该怎么做,让我的 Flowable 使用指定的线程.subscribeOn()
?
Flowable.create({ emitter ->
dataClient.setCallback(object : Callback {
override fun message(message: DataModel) {
emitter.onNext(vehicle)
}
override fun done() {
emitter.onComplete()
}
})
emitter.setCancellable {
dataClient.setCallback(null)
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.computation())
.doOnNext { Log.e("DATA", Thread.currentThread().name) }
.observeOn(AndroidSchedulers.mainThread())
.subscribe { data -> Log.d("DATA", "Got data" + data.id)) }
解决方案
subscribeOn
调度程序确保订阅在相关线程上完成。订阅完全是发生的,它的处理方式与observeOn
调度程序不同,调度程序在新线程上调度元素的发射。
Flowable.create({ emitter ->
// this runs with the computation scheduler
dataClient.setCallback(object : Callback {
override fun message(message: DataModel) {
// this runs on the thread it's called from
emitter.onNext(vehicle)
}
override fun done() {
// this runs on the thread it's called from
emitter.onComplete()
}
})
emitter.setCancellable {
dataClient.setCallback(null)
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.computation())
.doOnNext {
// this runs on the thread of the onNext call
Log.e("DATA", Thread.currentThread().name)
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// this runs on the main thread
data -> Log.d("DATA", "Got data" + data.id))
}
由于您的订阅代码没有阻塞并且不维护发射线程,subscribeOn
因此不需要设置并且可以省略。它主要只对同步源有效。
推荐阅读
- angular - Angular:如何从服务共享价值到整个项目 Angular 11?
- c# - 在 .NET Core 中将请求服务对象作为 null 传递
- latex - 如何在乳胶中将图片标题设置为局部小?
- c++ - 没有原子的 C++ 线程安全
- c# - 如何在 C# 中测试内部调用公共实例方法的方法
- android-studio - Android现金:UnsatisfiedLinkError:dlopen失败:找不到符号“pthread_cond_clockwait”
- instagram - Instagram Business Discovery API 无效的 OAuth 访问令牌
- datetime - 如何解析 PRTG 使用的时间戳
- typescript - 使用 reduce() 和 Typescript 解析嵌套对象
- typescript - Angular - onfig.service.ts:11:1 - 错误 TS2532:对象可能是“未定义”