rx-java - RxJava2 - 间隔和调度器
问题描述
假设我有一个间隔,并且我给了它一个计算调度程序。像这样:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.flatMap { ... }
那么,flatmap {...} 中发生的一切是否也会安排在计算线程上?
在 Observable.interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) 的源代码中,它说:
* @param scheduler
* the Scheduler on which the waiting happens and items are emitted
作为 RxJava 的初学者,我很难理解这个评论。我知道间隔计时器/等待逻辑发生在计算线程上。但是,关于正在发射的项目的最后一部分是否也意味着发射的项目将在同一个线程上消耗?还是需要一个observeOn?像这样:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.observeOn(computationScheduler)
.flatMap { ... }
如果我希望在计算线程上处理发射,那是否需要观察?
解决方案
这很容易验证:只需打印当前线程即可查看操作符在哪个线程上执行:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
这将始终打印:
on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9
顺序处理,因为所有都发生在一个线程中 -> main
。
observeOn
将改变下游执行线程:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.observeOn(Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.observeOn(Schedulers.io())
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
这次的结果对于每次执行都会有所不同,但flatmap
会subscribe
在不同的线程中处理:
on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1
interval
将充当observeOn
并更改下游执行线程(调度程序):
Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
这次执行是在计算调度程序的一个线程内顺序执行的:
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...
interval
默认情况下将使用计算调度程序,您不需要将其作为参数传递,observeOn
也不需要
推荐阅读
- javascript - 为什么 react-redux 将状态与身份 (===) 运算符进行比较?不能这样比较对象
- apache-drill - 当 NullHandling 设置为 INTERNAL 时,找不到函数签名的匹配项
- sql - 我正在尝试将 bacclaim 表中的 loss_dt 更新为 claim_id 上的其他表 bactrans 中的 trans_dt,其中 loss_Dt >trans_dt
- ios - Google Places API 搜索结果在地图上刷新
- java - 检测字符串的正则表达式仅包含 . 或者 ?或者?
- php - PHP Guzzle - 如果我只是将它用于中间件,我需要设置一个处理程序吗?
- python - 哪些 Continuum API 可用于跟踪工件、包和发布?
- python - Amazon Transcribe on S3 Upload:“[错误] BadRequestException:提供的 URI 未指向 S3 对象”
- c# - 如何在设计自动化 api 中为 revit 创建多个文件并将结果下载为一个 zip 文件
- javascript - 如何在 Loopback/Node.Js 中创建角色