multithreading - 我可以从 RxJava 流中通知 BehaviorProcessor 吗?
问题描述
我想获得您对以下代码的反馈。我想知道currentSession.onNext(result.session)
从SessionManager.signIn
流内部调用是否安全。
我的第一个直觉是NO
因为多线程和同步问题,这意味着,基于这段代码,我可以currentSession.onNext(result.session)
从不同的线程调用。
这是代码,请告诉我您的想法!谢谢
SessionManager 是一个单例
@Singleton
class SessionManager @Inject constructor(
private val sessionService: SessionService,
){
val currentSession = BehaviorProcessor.create<Session>()
fun signIn(login: String, password: String): Single<Boolean> =
sessionService.signIn(login, password)
.doOnNext(result ->
if (session is Success) {
currentSession.onNext(result.session)
}
).map { result ->
when (result) {
is Success -> true
else -> false
}
}
.subscribeOn(Schedulers.io())
}
HomeView 是订阅 SessionManager 的登录流的随机视图
class HomeView(val context: Context) : View(context) {
@Inject
lateinit var sessionManager: SessionManager
private val disposables = CompositeDisposable()
override fun onAttachedToWindow() {
super.onAttachedToWindow()
disposables.add(sessionManager.signIn("username", "password")
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.subscribe { result ->
textView.text = if (result) "Success" else "Fail"
})
}
override fun onDetachedFromWindow() {
super.onDetachedFromWindow()
disposables.clear()
}
}
观察currentSession
来自的随机视图SessionManager
class RandomView(val context: Context) : View(context) {
@Inject
lateinit var sessionManager: SessionManager
private val disposables = CompositeDisposable()
override fun onAttachedToWindow() {
super.onAttachedToWindow()
disposables.add(sessionManager.currentSession
.distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread())
.subscribe { session -> userTextView.text = session.userName })
}
override fun onDetachedFromWindow() {
super.onDetachedFromWindow()
disposables.clear()
}
}
解决方案
BehaviorProcessor的文档说:
调用 onNext(Object)、offer(Object)、onError(Throwable) 和 onComplete() 需要序列化(从同一线程调用或通过外部序列化方式从不同线程非重叠调用)。所有 FlowableProcessor 都可用的 FlowableProcessor.toSerialized() 方法提供了这样的序列化并且还防止了重入(即,当使用该处理器的下游订阅者也希望递归地调用该处理器上的 onNext(Object) 时)。
所以如果你这样定义它:
val currentSession = BehaviorProcessor.create<Session>().toSerialized()
那么您可以安全地onNext
从任何线程调用,它不会导致任何同步问题。
笔记:
我同意处理器的更新应该在 adoOnNext
而不是map
.
我认为最好使用 aCompletable
而不是 a Single<Boolean>
,并使用 Rx 错误来指示阻止登录的原因。您还应该在subscribe
方法中定义错误处理程序。
推荐阅读
- c# - Blueprism - 在电子邮件正文中突出显示收集列
- php - 将 Symfony 2.8 升级到 3.4 时出现 appKernel 错误
- python - 如何用“。”剥离字符串 在Python中正确吗?
- sql-server - 对先前插入进行条件更新的插入触发器
- json - 我应该如何解析这个 json 对象以响应生命周期方法?
- database - 在撤消数据库“PROD_Search_Service_LinksStore”中记录的操作期间,日志记录 ID 发生错误
- html - Meta 标签上的 Thymeleaf 功能
- javascript - svg.js:水平偏移换行 tspan 以创建交错外观
- html - 使用 Angular 4 在新选项卡中打开静态网页
- c# - DataGridView 更改单元格背景并恢复默认样式