首页 > 解决方案 > RxJava2 - 间隔和调度器

问题描述

假设我有一个间隔,并且我给了它一个计算调度程序。像这样:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .flatMap { ... }

那么,flatmap {...} 中发生的一切是否也会安排在计算线程上?

在 Observable.interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) 的源代码中,它说:

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

作为 RxJava 的初学者,我很难理解这个评论。我知道间隔计时器/等待逻辑发生在计算线程上。但是,关于正在发射的项目的最后一部分是否也意味着发射的项目将在同一个线程上消耗?还是需要一个observeOn?像这样:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .observeOn(computationScheduler)
    .flatMap { ... }

如果我希望在计算线程上处理发射,那是否需要观察?

标签: rx-javarx-java2

解决方案


这很容易验证:只需打印当前线程即可查看操作符在哪个线程上执行:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

这将始终打印:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

顺序处理,因为所有都发生在一个线程中 -> main

observeOn将改变下游执行线程:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

这次的结果对于每次执行都会有所不同,但flatmapsubscribe在不同的线程中处理:

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

interval将充当observeOn并更改下游执行线程(调度程序):

Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

这次执行是在计算调度程序的一个线程内顺序执行的:

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

interval默认情况下将使用计算调度程序,您不需要将其作为参数传递,observeOn也不需要


推荐阅读