首页 > 解决方案 > 在 Observable/Flowable 中将用于每个发射的重用值放在哪里?

问题描述

我有一些只需要执行一次的昂贵操作(例如加载/下载大文件、加载大型 ML 模型或根据其他一些数据计算优化的数据结构)。我想将它用于 Observable/Flowable 生成的每个值:

以下代码有效,但它在调用heavyProcessing()heavyProcessing2()的线程上运行。就我而言,我无法选择我的调用者线程(它的main线程,因为我使用的是 WorkManager 的 RxWorker,它从 调用 createWork main)。因此,start阻塞了主线程。我如何才能heavyProcessing在 RxJava 的后台执行并且也可用于后续的 RxJava 链?

fun start(): Observable<Unit> {
    val heavy = heavyProcessing() // the heavy value i want to use everywhere!
    val anotherHeavyObject = heavyProcessing2()
    val items = Observable.fromIterable(listOfHundredsOfItems)
             .map { doSomeWork(it, heavy) }
             .map { doSomeWork(it, anotherHeavyObject) }
}

到目前为止,我的尝试没有奏效:

  1. 围绕现有函数创建一个包装器:此代码的问题是start()未观察到由返回的 Observable,因此doSomeWork实际上并没有完成。我只知道这一点,因为我在 at 中设置了断点doSomeWork,并且它永远不会被调用。
    fun startInBackground(): Single<Unit> {
        return Single.fromCallable {
            start()
        }
    }
  1. 我一直在尝试寻找“解除”内部Observable(在内部Single)的方法,因为这可能是这里的问题。内部Observable没有被观察到。

即使在阅读指南之后,这个 RxJava 的东西也很不直观

标签: androidrx-javarx-java2

解决方案


是的,它与Deferred-dependent有关。文档中的示例状态:

有时,前一个序列和新序列之间存在隐含的数据依赖关系,由于某种原因,它没有通过“常规通道”流动。有人会倾向于编写如下的延续:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

实际上,我需要调用者做的就是:

Single.defer { start() }.map { doMoreWork() }

代替

start().map { doMoreWork() }

推荐阅读