首页 > 解决方案 > RxJava 中的“订阅者线程”是什么意思

问题描述

我正在读一本关于 RxJava 的书,下面是其中的摘录:

Observable.create(s -> {
   ... async subscription and data emission ...
})
.doOnNext(i -> System.out.println(Thread.currentThread()))
.filter(i -> i % 2 == 0)
.map(i -> "Value " + i + " processed on " + Thread.currentThread())
.subscribe(s -> System.out.println("SOME VALUE =>" + s));
System.out.println("Will print BEFORE values are emitted")”

他写道,由于操作是异步的,因此值将在与订阅者不同的线程上发出。由于订阅者的线程是非阻塞的,Will print BEFORE values are emitted将在任何值推送给订阅者之前打印。

我无法理解这个订阅者线程是什么?我能以某种方式得到它的名字吗?为了测试作者的意思,我写了一些代码:

Observable.create(
            subscriber -> {
              System.out.println("Entering thread: " + Thread.currentThread().getName());
              Thread t =
                  new Thread(
                      new Runnable() {
                        @Override
                        public void run() {
                          try {
                            System.out.println(
                                "Emitting threads: " + Thread.currentThread().getName());
                            Thread.sleep(5000);
                            subscriber.onNext(1);
                            subscriber.onNext(2);
                            subscriber.onNext(3);
                            subscriber.onCompleted();
                          } catch (InterruptedException e) {
                            subscriber.onError(e);
                          }
                        }
                      });
              t.start();
            })
        .subscribe(
            a -> {
              System.out.print("Subscriber thread: " + Thread.currentThread().getName() + " ");
              System.out.println(a);
            });
    System.out.println("Main thread exiting:");

当我运行上面的代码时,我发现订阅者线程与onNext()被调用的线程相同。由于这是一个异步操作,因此项目应该在与订阅者不同的线程上发出。怎么两条线都一样?

此外,即使在主线程退出后,程序也会继续运行,并且只有在所有元素都通过 onNext() 下推后才会终止。为什么会这样?为什么程序不退出?

标签: javarx-javareactive-programming

解决方案


订阅者线程是正在调用的线程subscribe。由于 RxJava 流程是这样的订阅调用链,因此这些调用可以通过subscribeOn.

例如:

Observable.fromCallable(() -> Thread.currentThread())
.subscribe(System.out::println);

System.out.println("Subscriber thread: " + Thread.currentThread());

这将打印订阅者线程两次。或者

Observable.fromCallable(() -> Thread.currentThread())
.subscribeOn(Schedulers.single())
.subscribe(System.out::println);

System.out.println("Subscriber thread: " + Thread.currentThread());

Thread.sleep(1000);

这将打印订阅者线程和 RxSingleScheduler 线程。


推荐阅读