spring-boot - Schdulers.elastic 不在 Reactor 中创建新线程
问题描述
我正在尝试创建一个流,其中通量发出 10 个项目,每个项目并行,每个项目休眠 1 秒。由于每个项目都在单独的线程上发布,我希望整个过程需要 1 秒。但是日志显示它需要 10 秒。
我尝试将 subscribeOn 更改为 publishOn,映射到 doOnNext。但它们似乎都不起作用。
我是 Reactor 的新手,正在尝试了解我哪里出错了。非常感激任何的帮助。谢谢
public void whenIsANewThreadCreated() {
Flux.range(1,10)
.publishOn(Schedulers.elastic())
.map(count -> {
logger.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return count;
})
.blockLast();
}
2020-03-30 16:17:29.799 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 1
2020-03-30 16:17:30.802 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 2
2020-03-30 16:17:31.804 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 3
2020-03-30 16:17:32.805 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 4
2020-03-30 16:17:33.806 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 5
2020-03-30 16:17:34.808 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 6
2020-03-30 16:17:35.809 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 7
2020-03-30 16:17:36.811 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 8
2020-03-30 16:17:37.812 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 9
2020-03-30 16:17:38.814 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 10
解决方案
您必须首先通过调用parallel
方法创建一个并行通量,并且您必须使用它runOn
来实现并行性。
Flux.range(1,10)
.parallel()
.runOn(Schedulers.elastic())
.map(count -> {
System.out.println(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return count;
}).subscribe();
- 不鼓励使用
Schedulers.boundedElastic()
as usingScheduler.elastic()
parallel
默认情况下将根据您的 CPU 内核创建线程。如果您想使用更多线程parallel(10)
- 我认为这就是您想要看到的。
推荐阅读
- javascript - 节点js子进程超时
- java - Java Hashtable:一个键 - 两个值,如何获得它们?
- android - 函数的返回类型为 'Future
' 但不以返回语句结尾 - snakemake - 蛇制造。创建多个目标时如何从命令行传递目标
- drupal - 如何用静态 HTML 页面临时替换 Drupal?
- spring-boot - 从 RSocket-Java 客户端连接到 Spring Boot RSocket 服务器时出错
- c# - 从主窗口处理用户控件内的按钮单击
- woocommerce - 如何根据产品类别在 WooCommerce 中自动设置交叉销售
- algorithm - 有人可以帮助我使用替换方法解决这种重复吗?
- javascript - CefSharp 调用页面事件