首页 > 解决方案 > 当另一个源进入管道时如何实现调度程序继承?

问题描述

我想将“调度程序继承”作为使用 RxJava2 的 API 的一部分来实现。我希望我的 API 的使用者能够从构建单个处理链而不是 DAG 的角度进行思考,即使在内部,新事件正在作为实现细节被引入。

我看不出有什么方法可以做到:

observable
.flatMap {
  val scheduler = Schedulers().current!!
  someOtherObservable
    .observeOn(scheduler)
}

还有其他方法可以继承调度程序吗?

更多背景

我有一个管道,如:

compositeDisposable += Environment
  .lookupDeviceInfo()
  .subscribeOn(scheduler)
  .flatMap { deviceInfo ->
    Device(deviceId = deviceInfo.id)
      .sendCommand()
  .subscribe(
    { result -> /*process result*/ },
    { e -> /*log error*/ })

对于消费者来说,这看起来像是他们将所有工作都推到了指定的scheduler: 事件中,从lookupDeviceInfo()get vectored 到来自该调度程序的工作人员,并且他们希望坚持该工作人员。

实际上,它们有一个错误,因为将sendCommand()来自另一个事件源的事件作为实现细节:

sendMessageSingle(deviceId, payload)
.flatMap { sentMessageId ->
  responseObservable
  .filter { it.messageId == sentMessageId }
  .firstOrError()
}

事件从 流入responseObservable,但这些事件都没有被引导到指定的scheduler,因为它已在上游应用。

标签: rx-java2

解决方案


从评论:

返回到同一个调度程序线程需要您提供一个单线程调度程序(即Schedulers.from(Executor)Schedulers.single()等)。没有当前的调度器,因为不能保证某些代码会在任何标准调度器上运行;它们可以在系统、其他框架等的任意线程上执行。因此,您必须通过 .将信号路由回所需的线程observeOn

我不关心登陆同一个线程,只是同一个调度程序。(即使更换 Worker 也可以,只要新的 Worker 由与旧的调度器相同的调度器出售。)

然后该建议仍然适用,您可以放弃我提到的“单线程”属性。


推荐阅读