java - 在遇到订阅后,下游需求不会向上游传播
问题描述
我有一个使用 asSpring
触发的应用程序ApplicationReadyEvent
:
@EventListener(ApplicationReadyEvent.class)
public void poll() throws InterruptedException {
Flux.just(1, 2, 3)
.doOnNext(System.out::println)
.log()
.subscribeOn(Schedulers.boundedElastic())
.log()
.flatMap(Mono::just)
.subscribe();
System.out.println("Thread exiting: " + Thread.currentThread().getName());
Thread.sleep(Integer.MAX_VALUE);
}
以下是日志:
{"timeStamp":"2021-04-21T15:16:31.288+05:30","message":"onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)","logger":"reactor.Flux.SubscribeOn.2","thread":"main","level":"INFO"}
{"timeStamp":"2021-04-21T15:16:31.289+05:30","message":"request(256)","logger":"reactor.Flux.SubscribeOn.2","thread":"main","level":"INFO"}
Thread exiting: main
该请求似乎在上面传播subscribeOn
。但是,如果我不使用subscribeOn
,它似乎按预期正常工作。
@EventListener(ApplicationReadyEvent.class)
public void poll() throws InterruptedException {
Flux.just(1, 2, 3)
.doOnNext(System.out::println)
.log()
.flatMap(Mono::just)
.subscribe();
System.out.println("Thread exiting: " + Thread.currentThread().getName());
Thread.sleep(Integer.MAX_VALUE);
}
日志:
{"timeStamp":"2021-04-21T15:20:35.370+05:30","message":"| onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
{"timeStamp":"2021-04-21T15:20:35.371+05:30","message":"| request(256)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
1
{"timeStamp":"2021-04-21T15:20:35.371+05:30","message":"| onNext(1)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
2
{"timeStamp":"2021-04-21T15:20:35.371+05:30","message":"| onNext(2)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
3
{"timeStamp":"2021-04-21T15:20:35.372+05:30","message":"| onNext(3)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
{"timeStamp":"2021-04-21T15:20:35.372+05:30","message":"| onComplete()","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
Thread exiting: main
这种行为背后的原因是什么?
解决方案
这个问题是由我使用的转发ScheduledExecutorService
类实现的,而不是 Reactor 的默认服务造成的:
Schedulers.addExecutorServiceDecorator(
"forwardingScheduledExecutorService",
(scheduler, scheduledExecutorService) ->
new ForwardingScheduledExecutorService(scheduledExecutorService));
推荐阅读
- mysql - sql根据两列或多列的条件选择多行
- python - 我的脚本在单击链接时遇到错误
- opencv - OpenCV 实时颜色/形状检测
- python - 带有 group by 并匹配条件的 ffil
- jquery - 解决 jQuery UI 在动画时不考虑边距的问题
- sdk - 指纹 U Are U 4500 PowerBuilder
- php - Wordpress Memberships - 获取计划中对特定元键具有非空值的所有活动成员。还检索其他元数据
- c - 为什么递归函数出现在交换函数之前?
- apache-spark - Py4JJavaError:调用 o288.fit 时出错
- javascript - 如何使用 javascript 访问 XML 子节点