java - RxJava 中的“订阅者线程”是什么意思
问题描述
我正在读一本关于 RxJava 的书,下面是其中的摘录:
Observable.create(s -> {
... async subscription and data emission ...
})
.doOnNext(i -> System.out.println(Thread.currentThread()))
.filter(i -> i % 2 == 0)
.map(i -> "Value " + i + " processed on " + Thread.currentThread())
.subscribe(s -> System.out.println("SOME VALUE =>" + s));
System.out.println("Will print BEFORE values are emitted")”
他写道,由于操作是异步的,因此值将在与订阅者不同的线程上发出。由于订阅者的线程是非阻塞的,Will print BEFORE values are emitted
将在任何值推送给订阅者之前打印。
我无法理解这个订阅者线程是什么?我能以某种方式得到它的名字吗?为了测试作者的意思,我写了一些代码:
Observable.create(
subscriber -> {
System.out.println("Entering thread: " + Thread.currentThread().getName());
Thread t =
new Thread(
new Runnable() {
@Override
public void run() {
try {
System.out.println(
"Emitting threads: " + Thread.currentThread().getName());
Thread.sleep(5000);
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
});
t.start();
})
.subscribe(
a -> {
System.out.print("Subscriber thread: " + Thread.currentThread().getName() + " ");
System.out.println(a);
});
System.out.println("Main thread exiting:");
当我运行上面的代码时,我发现订阅者线程与onNext()
被调用的线程相同。由于这是一个异步操作,因此项目应该在与订阅者不同的线程上发出。怎么两条线都一样?
此外,即使在主线程退出后,程序也会继续运行,并且只有在所有元素都通过 onNext() 下推后才会终止。为什么会这样?为什么程序不退出?
解决方案
订阅者线程是正在调用的线程subscribe
。由于 RxJava 流程是这样的订阅调用链,因此这些调用可以通过subscribeOn
.
例如:
Observable.fromCallable(() -> Thread.currentThread())
.subscribe(System.out::println);
System.out.println("Subscriber thread: " + Thread.currentThread());
这将打印订阅者线程两次。或者
Observable.fromCallable(() -> Thread.currentThread())
.subscribeOn(Schedulers.single())
.subscribe(System.out::println);
System.out.println("Subscriber thread: " + Thread.currentThread());
Thread.sleep(1000);
这将打印订阅者线程和 RxSingleScheduler 线程。
推荐阅读
- r - 为什么使用 RScript 调用 R 脚本有效,但不适用于 R 二进制文件?
- com - 如何在 SourceSafe 中获取项目 (IVssItem) 的日期?
- python - 在 django 中创建/更新模型表单的最佳方法
- ios - SwiftUI ForEach 未更新 ObservableObject 类数组的以下更改
- github - 如何使用 ESP8266 上的令牌向 GitHub 进行身份验证?
- python - 操作类型未在桌面上运行的二进制文件中注册“MaxBytesInUse”
- python - 如何在 50 秒后杀死一个子进程?
- javascript - 如何检查子数组中的元素是否存在于数组的其他子数组中并通过第一个索引的值进行排序
- android - ActionBar 已经覆盖了页面的内容
- javascript - 调整屏幕大小:弄乱网站上的部分