rx-java2 - 当另一个源进入管道时如何实现调度程序继承?
问题描述
我想将“调度程序继承”作为使用 RxJava2 的 API 的一部分来实现。我希望我的 API 的使用者能够从构建单个处理链而不是 DAG 的角度进行思考,即使在内部,新事件正在作为实现细节被引入。
我看不出有什么方法可以做到:
observable
.flatMap {
val scheduler = Schedulers().current!!
someOtherObservable
.observeOn(scheduler)
}
还有其他方法可以继承调度程序吗?
更多背景
我有一个管道,如:
compositeDisposable += Environment
.lookupDeviceInfo()
.subscribeOn(scheduler)
.flatMap { deviceInfo ->
Device(deviceId = deviceInfo.id)
.sendCommand()
.subscribe(
{ result -> /*process result*/ },
{ e -> /*log error*/ })
对于消费者来说,这看起来像是他们将所有工作都推到了指定的scheduler
: 事件中,从lookupDeviceInfo()
get vectored 到来自该调度程序的工作人员,并且他们希望坚持该工作人员。
实际上,它们有一个错误,因为将sendCommand()
来自另一个事件源的事件作为实现细节:
sendMessageSingle(deviceId, payload)
.flatMap { sentMessageId ->
responseObservable
.filter { it.messageId == sentMessageId }
.firstOrError()
}
事件从 流入responseObservable
,但这些事件都没有被引导到指定的scheduler
,因为它已在上游应用。
解决方案
从评论:
返回到同一个调度程序线程需要您提供一个单线程调度程序(即Schedulers.from(Executor)
、Schedulers.single()
等)。没有当前的调度器,因为不能保证某些代码会在任何标准调度器上运行;它们可以在系统、其他框架等的任意线程上执行。因此,您必须通过 .将信号路由回所需的线程observeOn
。
我不关心登陆同一个线程,只是同一个调度程序。(即使更换 Worker 也可以,只要新的 Worker 由与旧的调度器相同的调度器出售。)
然后该建议仍然适用,您可以放弃我提到的“单线程”属性。
推荐阅读
- arrays - 在 JSON 输出中强制将单个结果作为数组
- android - 如何使用 Retrofit 和 RxJava 捕获 HttpException?
- java - 为什么 Java 数据结构(如数组)似乎不支持向量化操作?
- assembly - 在这种情况下,MOVSX 指令符号如何扩展输入?
- java - 如何在 RecyclerView 中为许多进度条设置进度
- css - 设置 SCSS 颜色变量不起作用
- angular4-forms - 如何将 ngForm 对象传递给我的自定义指令?
- android - AlertDialog 无法从其他方法访问对话框视图
- azure - 对 Azure Data Lake Storage Gen 2 的 REST API 调用不起作用。给我错误“观众验证失败。观众不匹配”
- laravel - 如何在 Laravel 5.8 的 Artisan 命令中使用 ENV 变量?