首页 > 解决方案 > 我可以从 RxJava 流中通知 BehaviorProcessor 吗?

问题描述

我想获得您对以下代码的反馈。我想知道currentSession.onNext(result.session)SessionManager.signIn流内部调用是否安全。

我的第一个直觉是NO因为多线程和同步问题,这意味着,基于这段代码,我可以currentSession.onNext(result.session)从不同的线程调用。

这是代码,请告诉我您的想法!谢谢

SessionManager 是一个单例

@Singleton
class SessionManager @Inject constructor(
    private val sessionService: SessionService,
){

    val currentSession = BehaviorProcessor.create<Session>()

    fun signIn(login: String, password: String): Single<Boolean> =
        sessionService.signIn(login, password)
            .doOnNext(result -> 
                if (session is Success) {
                   currentSession.onNext(result.session)
                }
            ).map { result ->
                when (result) {
                    is Success -> true
                    else -> false
                }
            }
            .subscribeOn(Schedulers.io())
}

HomeView 是订阅 SessionManager 的登录流的随机视图

class HomeView(val context: Context) : View(context) {

        @Inject
        lateinit var sessionManager: SessionManager

        private val disposables = CompositeDisposable()

        override fun onAttachedToWindow() {
            super.onAttachedToWindow()

            disposables.add(sessionManager.signIn("username", "password")
                .distinctUntilChanged()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe { result ->
                    textView.text = if (result) "Success" else "Fail"
                })
        }

        override fun onDetachedFromWindow() {
            super.onDetachedFromWindow()
            disposables.clear()
        }
    }

观察currentSession来自的随机视图SessionManager

class RandomView(val context: Context) : View(context) {

        @Inject
        lateinit var sessionManager: SessionManager

        private val disposables = CompositeDisposable()

        override fun onAttachedToWindow() {
            super.onAttachedToWindow()

            disposables.add(sessionManager.currentSession
                .distinctUntilChanged()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe { session -> userTextView.text = session.userName })
        }

        override fun onDetachedFromWindow() {
            super.onDetachedFromWindow()
            disposables.clear()
        }
    }

标签: multithreadingkotlinrx-java2observer-pattern

解决方案


BehaviorProcessor的文档说:

调用 onNext(Object)、offer(Object)、onError(Throwable) 和 onComplete() 需要序列化(从同一线程调用或通过外部序列化方式从不同线程非重叠调用)。所有 FlowableProcessor 都可用的 FlowableProcessor.toSerialized() 方法提供了这样的序列化并且还防止了重入(即,当使用该处理器的下游订阅者也希望递归地调用该处理器上的 onNext(Object) 时)。

所以如果你这样定义它:

val currentSession = BehaviorProcessor.create<Session>().toSerialized()

那么您可以安全地onNext从任何线程调用,它不会导致任何同步问题。

笔记:

我同意处理器的更新应该在 adoOnNext而不是map.

我认为最好使用 aCompletable而不是 a Single<Boolean>,并使用 Rx 错误来指示阻止登录的原因。您还应该在subscribe方法中定义错误处理程序。


推荐阅读