首页 > 解决方案 > Project Reactor:在订阅和发布时线程将如何创建,流程如何?,堆栈跟踪?

问题描述

简单的例子来理解线程流。

  1. [ gshp subscribedOn-1 ] 信息reactor.Flux.FlatMap.1 -onSubscribe( FluxFlatMap.FlatMapMain )

  2. [ gshp publishOn-7 ] 信息reactor.Flux.FlatMap.1 - onNext(6)

这里reactor.Flux.FlatMap.1gshp subscribedOn-1gshp publishOn-7 都是通用的

当我们运行 java 时,它从主线程开始,然后会发生什么,它会创建gshp subscribedOn-1还是reactor.Flux.FlatMap.1

  @Test
  public void setUpTestTest() {
      Scheduler scheduler1 = Schedulers.newParallel("gshp subscribedOn", 3);
      Scheduler scheduler2 = Schedulers.newParallel("gshp publishOn", 6);
      Flux<String> flux = Flux.range(1, 200)
                              .flatMap(s-> Flux.just(""+s)
                                               .publishOn(scheduler2)
                                               .concatMap(d->processMessagefluxpause(d, "test")))
                                               .log()
                              .subscribeOn(scheduler1);

    StepVerifier.create(flux).expectNextCount(20).verifyComplete();
}

这意味着什么,流程如何? 在此处输入图像描述

标签: javareactive-programmingspring-webfluxproject-reactor

解决方案


在这里你可以阅读subscribeOnpublishOn

发布与订阅

基本上只要有人订阅,就会创建整个链,并为调用分配一个线程。如果您在链中的任何地方都有a subscribeOn,则整个调用将使用此调度程序。所以放在哪里都无所谓subscribeOn

您可以在日志中看到,它开始了,调用被放置在subscribeOn调度程序上。

onPublish另一方面,一旦我们到达该语句,线程就会在中途切换到该调度程序。所以这更多地取决于它在链中的位置。

您的日志显示,当内部 Flux 发出时,它正在onPublish调度程序上发出。


推荐阅读