首页 > 解决方案 > 在遇到订阅后,下游需求不会向上游传播

问题描述

我有一个使用 asSpring触发的应用程序ApplicationReadyEvent

  @EventListener(ApplicationReadyEvent.class)
  public void poll() throws InterruptedException {
    Flux.just(1, 2, 3)
        .doOnNext(System.out::println)
        .log()
        .subscribeOn(Schedulers.boundedElastic())
        .log()
        .flatMap(Mono::just)
        .subscribe();
    System.out.println("Thread exiting: " + Thread.currentThread().getName());
    Thread.sleep(Integer.MAX_VALUE);
  }

以下是日志:

{"timeStamp":"2021-04-21T15:16:31.288+05:30","message":"onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)","logger":"reactor.Flux.SubscribeOn.2","thread":"main","level":"INFO"}
{"timeStamp":"2021-04-21T15:16:31.289+05:30","message":"request(256)","logger":"reactor.Flux.SubscribeOn.2","thread":"main","level":"INFO"}
Thread exiting: main

该请求似乎在上面传播subscribeOn。但是,如果我不使用subscribeOn,它似乎按预期正常工作。

  @EventListener(ApplicationReadyEvent.class)
  public void poll() throws InterruptedException {
    Flux.just(1, 2, 3)
        .doOnNext(System.out::println)
        .log()
        .flatMap(Mono::just)
        .subscribe();
    System.out.println("Thread exiting: " + Thread.currentThread().getName());
    Thread.sleep(Integer.MAX_VALUE);
  }

日志:

{"timeStamp":"2021-04-21T15:20:35.370+05:30","message":"| onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
{"timeStamp":"2021-04-21T15:20:35.371+05:30","message":"| request(256)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
1
{"timeStamp":"2021-04-21T15:20:35.371+05:30","message":"| onNext(1)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
2
{"timeStamp":"2021-04-21T15:20:35.371+05:30","message":"| onNext(2)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
3
{"timeStamp":"2021-04-21T15:20:35.372+05:30","message":"| onNext(3)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
{"timeStamp":"2021-04-21T15:20:35.372+05:30","message":"| onComplete()","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
Thread exiting: main

这种行为背后的原因是什么?

标签: javareactive-programmingproject-reactor

解决方案


这个问题是由我使用的转发ScheduledExecutorService类实现的,而不是 Reactor 的默认服务造成的:

Schedulers.addExecutorServiceDecorator(
        "forwardingScheduledExecutorService",
        (scheduler, scheduledExecutorService) ->
            new ForwardingScheduledExecutorService(scheduledExecutorService));

推荐阅读