reactive-programming - Mono.fromCallable 线程行为
问题描述
我试图了解 Reactor 中阻塞函数的行为,但其他一些事情完全让我脱离了研究。这是代码:
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
.flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic()))
.subscribe();
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
private static int blockingMethod(int s) {
try {
Thread.sleep(100_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
这是AFAIK发生的事情的摘要:
订阅发生在
main
线程上。main
在内部变得自由flatMap
以从上游带来下一个元素。因此,doOnNext
应始终打印main
.处理 100_000 个元素后,
main
将变为 free 并 printhere
。
相反,这就是发生的事情:
前 256 个元素按预期打印在
main
(in ) 上。doOnNext
大约 1 秒后,接下来是 256,然后是下一个,依此类推。从第二批开始的元素被打印在
elastic
线程上。
以下是我的问题:
为什么要批量处理 256 个元素?
Schedulers.elastic()
应该按需创建线程,因此理想情况下应该始终有一个线程可用于从 main 获取请求(忽略 JVM 对我可以创建的线程数量的限制)。为什么将第二批(及以后)中的元素打印在
elastic
线程上?我希望它们能在main
. 事实上,当您删除阻塞调用时会发生这种情况public static void main(String[] args) throws InterruptedException { Flux.range(1, 100_000) .doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName())) .flatMap(a -> Mono.just(a).subscribeOn(Schedulers.elastic())) .subscribe(); System.out.println("Here"); Thread.sleep(Integer.MAX_VALUE); }
在这里,所有元素都打印出来main
,doOnNext
并且here
仅在流完成时才打印(释放主线程)。
我错过了什么吗?
解决方案
您需要了解整个反应器是基于反应流规范构建的。因此,上面的每个运算符都是发布者和订阅者的组合。
在场景 2
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", thread: " + Thread.currentThread().getName()))
.flatMap(a -> Mono.just(a).subscribeOn(Schedulers.elastic()))
.subscribe();
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
发布者从主线程 (Flux.Range) 开始,订阅者 (doOnNext) 正在订阅(在主线程上调用 onNext 调用),它被委托给一个弹性线程,该线程的消耗速度比订阅者发布的速度要快。所以整个 Flux.Range 在主线程上被调用并被分发到弹性线程池来处理
在场景 1
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 100_000)
.doOnNext(a -> System.out.println(a + ", Publisher: " + Thread.currentThread().getName()))
.flatMap(a -> Mono.fromCallable(() -> blockingMethod(a)).subscribeOn(Schedulers.elastic())).doOnNext(a -> System.out.println(a + ", Subscriber: " + Thread.currentThread().getName()))
.subscribe();
System.out.println("Here");
Thread.sleep(Integer.MAX_VALUE);
}
private static int blockingMethod(int s) {
try {
Thread.sleep(100_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
Flux.Range 的前 256 次调用发生在主线程上。请记住,响应式流是背压驱动的编程,并且由于平面地图只能合并 256 个上游,平面地图运算符(它是订阅者)不会在 256 个事件后调用 onNext 调用,因为它们都被阻塞/等待完全的。
主线程现在被释放了,因为它完成了参与反应管道并走出传送带的责任。即基本上继续执行下一行代码。这就是为什么您会在前 256 个元素之后看到“Here”的原因。然后主线程继续按照代码无限期地阻塞。
一旦其中一个阻塞线程完成工作,它们将触发订阅者在完成较早请求的线程(即弹性线程)上调用 onNext。弹性线程现在会将调用委托给内部反应器管道中的另一个弹性线程。
这意味着对于我发出的同一个项目,打印“Publisher”的线程和打印“Subscriber”的线程是不一样的,即使它们是弹性线程。随意验证以下代码
推荐阅读
- java - 如何在递归实体中找到对象?
- sql - MSAccess SQL 查询 - 将所有表与多个表联合 - 重复字段
- react-native - react-native 与 expo 出错:在“Connect(App)”的上下文中找不到“store”
- kubernetes - 为什么这个 ceph pod 崩溃
- java - Spring自动装配调用错误的构造函数
- bash - Bash:在目录中的特定文件上嵌套for循环
- python-2.7 - while 循环无限运行。无法使用计时器终止 while 循环
- hyperledger-fabric - 如何在超级账本结构中升级/更新链码
- java - 单个无变量语句中当前月份的第一天(或最后一天)?
- python-3.x - 寻求澄清 pytest 断言是否通过或失败取决于稍后的异常