android - RxKotlin - 错误的订阅,observeOn 线程更改主题的活动?
问题描述
我有一个在随机时刻生成不同字符串的对象,我需要订阅这个生成器来获取这些字符串并将它们提供给 ui(也许它将是不同活动中的多个订阅者)。假设,我得到以下代码:
发电机:
class Generator {
private var stringToGenerate = ""
var subject: BehaviorSubject<String> = BehaviorSubject.create<String>()
init {
//seems like these instructions are skipped
subject
.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext { t -> Log.i("subject doOnNext", Thread.currentThread().name + " " + Thread.currentThread().id) }
.observeOn(AndroidSchedulers.mainThread())
.map { _ -> Log.i("subject map", Thread.currentThread().name + " " + Thread.currentThread().id) }
//imitation of async creating of strings in separate thread
timer("timerThread", false, 2000L, 2000L) {
stringToGenerate = System.currentTimeMillis().toString()
subject.onNext(stringToGenerate)
}
}
}
必须使用生成的字符串的活动之一:
class TestActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_test)
val wrongThreadObserver = object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
Log.i("wrongThreadObserver", Thread.currentThread().name + " " + Thread.currentThread().id)
}
override fun onError(e: Throwable) {
}
}
val generator = Generator()
generator.subject.subscribe(wrongThreadObserver)
//for correct work illustration
val correctThreadObserver = object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
Log.i("correctThreadObserver", Thread.currentThread().name + " " + Thread.currentThread().id)
}
override fun onError(e: Throwable) {
}
}
val mainThreadSubject = BehaviorSubject.create<String>()
mainThreadSubject
.doOnNext { obj -> Log.i("correctThread doOnNext", Thread.currentThread().name + " " + Thread.currentThread().id) }
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(correctThreadObserver)
mainThreadSubject.onNext("test thread")
val handler = Handler()
handler.postDelayed({ mainThreadSubject.onNext("test thread 2") }, 1000)
handler.postDelayed({ mainThreadSubject.onNext("test thread 3") }, 2000)
}
}
在这种情况下,仅在活动中创建的correctThreadObserver工作正常,但wrongThreadObserver在计时器线程中保持工作,似乎它忽略了生成器中的指令subscribeOn、ObserveOn、doOnNext,无论这些指令在哪里调用 - 在 init 中,在计时器线程中,在通过从生成器获取对象来进行活动 - wrongThreadObserver仍然在计时器线程中工作。所以日志是:
I/correctThread doOnNext: main 2
I/正确的ThreadObserver:RxNewThreadScheduler-1 941
I/correctThread doOnNext: main 2
I/正确的ThreadObserver:RxNewThreadScheduler-1 941
I/wrongThreadObserver: timerThread 937
I/correctThread doOnNext: main 2
I/正确的ThreadObserver:RxNewThreadScheduler-1 941
I/wrongThreadObserver: timerThread 937
I/wrongThreadObserver: timerThread 937
I/wrongThreadObserver: timerThread 937
没有doOnNext
也没有主线程wrongThreadObserver
我做错了什么?
解决方案
我找到了以下解决方案:我们必须订阅Observable
,而不是订阅Subject
,所以我们必须使用方法的结果observeOn()
,而不是对象“主题”本身。如果需要多次订阅,我们可以将结果缓存observeOn
在单独的变量中:
//in Generator
.......
var observableInSeparateVar = subject.observeOn(AndroidSchedulers.mainThread())
.......
//in TestActivity
.......
generator.observableInSeparateVar.subscribe(wrongThreadObserver)
.......
我们也可以在此之后调用observeOn()
主题并订阅:
//in TestActivity
.......
generator.subject
.observeOn(AndroidSchedulers.mainThread())
.subscribe(wrongThreadObserver)
推荐阅读
- html - Bootstrap 5 - 覆盖导航栏并从顶部推送内容
- azure - 有什么方法可以深度链接到 Microsoft Azure Intelligent Hub 应用程序?
- javascript - 有没有办法在 CSS/HTML/JS 中以三角形/金字塔方式生成六边形?
- sql - 查询优化/重写
- git - 如何让我的网络服务器从我的仓库自动更新?
- python - 错误:关机后无法注册 atexit
- google-oauth - Facebook/Google 登录 API,如何确保用户不是机器人
- python - 如何使用云运行python api从大查询表中读取大数据以及系统配置应该是什么?
- php - Laravel 如何让 Auth 类在不同的模型而不是用户上执行函数
- python - Kivymd:无法在 Scrollview 中添加多个文本字段和按钮