java - Project Reactor:在订阅和发布时线程将如何创建,流程如何?,堆栈跟踪?
问题描述
简单的例子来理解线程流。
[ gshp subscribedOn-1 ] 信息reactor.Flux.FlatMap.1 -onSubscribe( FluxFlatMap.FlatMapMain )
[ gshp publishOn-7 ] 信息reactor.Flux.FlatMap.1 - onNext(6)
这里reactor.Flux.FlatMap.1对gshp subscribedOn-1和gshp publishOn-7 都是通用的
当我们运行 java 时,它从主线程开始,然后会发生什么,它会创建gshp subscribedOn-1还是reactor.Flux.FlatMap.1?
@Test
public void setUpTestTest() {
Scheduler scheduler1 = Schedulers.newParallel("gshp subscribedOn", 3);
Scheduler scheduler2 = Schedulers.newParallel("gshp publishOn", 6);
Flux<String> flux = Flux.range(1, 200)
.flatMap(s-> Flux.just(""+s)
.publishOn(scheduler2)
.concatMap(d->processMessagefluxpause(d, "test")))
.log()
.subscribeOn(scheduler1);
StepVerifier.create(flux).expectNextCount(20).verifyComplete();
}
解决方案
在这里你可以阅读subscribeOn
和publishOn
。
基本上只要有人订阅,就会创建整个链,并为调用分配一个线程。如果您在链中的任何地方都有a subscribeOn
,则整个调用将使用此调度程序。所以放在哪里都无所谓subscribeOn
。
您可以在日志中看到,它开始了,调用被放置在subscribeOn
调度程序上。
onPublish
另一方面,一旦我们到达该语句,线程就会在中途切换到该调度程序。所以这更多地取决于它在链中的位置。
您的日志显示,当内部 Flux 发出时,它正在onPublish
调度程序上发出。
推荐阅读
- c++ - Raspberry Pi 集群的库
- c - 通过函数传递结构指针然后访问结构数据
- c# - 正则表达式来解析某些 html 标签
- c# - SSIS 脚本组件在用户代码中遇到异常
- html - 如何使用 CSS Grid 和容器类将其分为两列?
- csv - 如何使用 awk、sed 或其他方式从 CSV 日志文件中获取过去一小时、6 小时、24 小时的数据?
- python - 为 Windows 编写一个脚本文件(.bat 或 .cmd),这样我就可以在任何预装 Python 的 Windows 10 PC 上自动运行现有的 Django 项目
- reactjs - React Bootstrap 列未水平显示
- javascript - 库存物品数量
- c++ - 计算字符串中有多少特定字符无法正常工作