java - RxJava中值发射代码和值接收代码的线程执行
问题描述
我有以下代码:
private static void log(Object msg) {
System.out.println(
Thread.currentThread().getName() +
": " + msg);
}
Observable<Integer> naturalNumbers = Observable.create(emitter -> {
log("Invoked"); // on main thread
Runnable r = () -> {
log("Invoked on another thread");
int i = 0;
while(!emitter.isDisposed()) {
log("Emitting "+ i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received "+i));
所以这里我们有 2 个重要的 lambda 表达式。第一个是我们传递给 Observable.create 的那个,第二个是我们传递给 Observable.subscribe() 的回调。在第一个 lambda 中,我们创建一个新线程,然后在该线程上发出值。在第二个 lambda 中,我们有代码来接收在第一个 lambda 代码中发出的那些值。我观察到两个代码都在同一个线程上执行。
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
Thread-0: Received 1
Thread-0: Emitting 2
Thread-0: Received 2
为什么会这样?默认情况下,RxJava 是否在同一线程上运行代码发射值(可观察)和代码接收值(观察者)?
解决方案
让我们看看,如果你使用 aThread
来执行一个 runnable 会发生什么:
测试
@Test
void threadTest() throws Exception {
log("main");
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(
() -> {
log("thread");
countDownLatch.countDown();
})
.start();
countDownLatch.await();
}
输出
main: main
Thread-0: thread
看来,主入口点是从main
线程调用的,而新创建的入口点是Thread
调用Thread-0
.
为什么会这样?默认情况下,RxJava 是否在同一线程上运行代码发射值(可观察)和代码接收值(观察者)?
默认情况下RxJava
是单线程的。因此,生产者,如果没有通过不同的定义observeOn
或不同的线程布局,将在(订阅者)线程subscribeOn
上发出值。consumer
这是因为RxJava
默认情况下会在订阅堆栈上运行所有内容。
示例 2
@Test
void fdskfkjsj() throws Exception {
log("main");
Observable<Integer> naturalNumbers =
Observable.create(
emitter -> {
log("Invoked"); // on main thread
Runnable r =
() -> {
log("Invoked on another thread");
int i = 0;
while (!emitter.isDisposed()) {
log("Emitting " + i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received " + i));
Thread.sleep(100);
}
输出2
main: main
main: Invoked
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
在您的示例中,很明显,main 方法是从主线程调用的。此外,subscribeActual
调用也在调用线程 ( main
) 上运行。但是Observable#create
lambdaonNext
从新创建的线程调用Thread-0
。该值从调用线程推送给订阅者。在这种情况下,调用线程是Thread-0
,因为它调用onNext
下游订阅者。
如何区分生产者和消费者?
使用observeOn
/subscribeOn
运算符来处理RxJava
.
我应该使用低级线程构造ẁith RxJava 吗?
不,您不应该使用new Thread
以将生产者与消费者分开。违约很容易,onNext
不能同时调用(交错),因此违约。这就是为什么RxJava
提供一个名为Scheduler
with Worker
s 的构造来减轻此类错误的原因。
注意:我认为这篇文章描述得很好:http: //introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html。请注意,这是 Rx.NET,但原理完全相同。如果您想阅读有关并发的信息,RxJava
还可以查看 Davids 博客(https://akarnokd.blogspot.com/2015/05/schedulers-part-1.html)或阅读本书(使用 RxJava 进行反应式编程https:// /www.oreilly.com/library/view/reactive-programming-with/9781491931646/)
推荐阅读
- node.js - 使用 node-redis 在设置上获取“ERR 语法错误”
- python - 如何知道 Pynput 中 on_press 和 on_release 之间的时间间隔?
- node.js - 下一个js调试不启动
- asp.net-mvc - 我们是否使用 Electron.Net 将 ASP.Net MVC 转换为桌面应用程序?
- kubernetes - 如何以可读的方式将两台主机之间的 udp 流量镜像到第三台远程 k8s 主机?
- java - 在 Spring WebServices 中实现标记接口
- opencart - 为什么点击任何商品卡Opencart 3.0.3.7后出现错误550
- javascript - 如何在 ejs 中包含页面(我要包含的页面也有包含语句)
- chromium - Chromium OS - 从 USB 启动时卡在“本地映像 A”上
- php - AJAX 请求不将数据返回到选择选项框