android - 在 Observable/Flowable 中将用于每个发射的重用值放在哪里?
问题描述
我有一些只需要执行一次的昂贵操作(例如加载/下载大文件、加载大型 ML 模型或根据其他一些数据计算优化的数据结构)。我想将它用于 Observable/Flowable 生成的每个值:
以下代码有效,但它在调用heavyProcessing()
者heavyProcessing2()
的线程上运行。就我而言,我无法选择我的调用者线程(它的main
线程,因为我使用的是 WorkManager 的 RxWorker,它从 调用 createWork main
)。因此,start
阻塞了主线程。我如何才能heavyProcessing
在 RxJava 的后台执行并且也可用于后续的 RxJava 链?
fun start(): Observable<Unit> {
val heavy = heavyProcessing() // the heavy value i want to use everywhere!
val anotherHeavyObject = heavyProcessing2()
val items = Observable.fromIterable(listOfHundredsOfItems)
.map { doSomeWork(it, heavy) }
.map { doSomeWork(it, anotherHeavyObject) }
}
到目前为止,我的尝试没有奏效:
- 围绕现有函数创建一个包装器:此代码的问题是
start()
未观察到由返回的 Observable,因此doSomeWork
实际上并没有完成。我只知道这一点,因为我在 at 中设置了断点doSomeWork
,并且它永远不会被调用。
fun startInBackground(): Single<Unit> {
return Single.fromCallable {
start()
}
}
- 我一直在尝试寻找“解除”内部
Observable
(在内部Single
)的方法,因为这可能是这里的问题。内部Observable
没有被观察到。
解决方案
是的,它与Deferred-dependent有关。文档中的示例状态:
有时,前一个序列和新序列之间存在隐含的数据依赖关系,由于某种原因,它没有通过“常规通道”流动。有人会倾向于编写如下的延续:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
实际上,我需要调用者做的就是:
Single.defer { start() }.map { doMoreWork() }
代替
start().map { doMoreWork() }
推荐阅读
- nginx - Nginx - 位置指令中的参数数量无效 - 正则表达式捕获带有空格的组
- react-native - React native:Ios 上的错误,但在 android 上没有
- nim-lang - Nim 是否支持自动投射?
- java - 使用未在数据集中提交的 SPARQL 插入数据 Jena ARQ
- wpf - 在.Net Core中添加值后如何刷新WPF数据网格
- wordpress - 如何在跨域(iframe)和同源场景中登录wordpress?
- wordpress - 条纹 Wordpress 发票
- cygwin - 我想在 Cygwin64 上构建 cgreen
- ios - UniversalLink 不适用于 iOS 14 设备,但在模拟器上运行良好
- angular - 为什么 agular 材料排序显示箭头但不对列进行排序?