spring-webflux - Reactor 框架与组装时间和订阅时间混淆(何时调用订阅)
问题描述
我实际上对组装时间和订阅时间感到困惑。我知道单声道是懒惰的,并且在订阅之前不会被执行。下面是一个方法。
public Mono<UserbaseEntityResponse> getUserbaseDetailsForEntityId(String id) {
GroupRequest request = ImmutableGroupRequest
.builder()
.cloudId(id)
.build();
Mono<List<GroupResponse>> response = ussClient.getGroups(request);
List<UserbaseEntityResponse.GroupPrincipal> groups = new CopyOnWriteArrayList<>();
response.flatMapIterable(elem -> elem)
.toIterable().iterator().forEachRemaining(
groupResponse -> {
groupResponse.getResources().iterator().forEachRemaining(
resource -> {
groups.add(ImmutableGroupPrincipal
.builder()
.groupId(resource.getId())
.name(resource.getDisplayName())
.addAllUsers(convertMemebersToUsers(resource))
.build());
}
);
}
);
log.debug("Response Object - " + groups.toString());
ImmutableUserbaseEntityResponse res = ImmutableUserbaseEntityResponse
.builder()
.userbaseId(id)
.addAllGroups(groups)
.build();
Flux<UserbaseEntityResponse.GroupPrincipal> f = Flux.fromIterable(res.getGroups())
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(groupPrincipal -> getResourcesForGroup((ImmutableGroupPrincipal)groupPrincipal, res.getUserbaseId()))
.sequential();
return Mono.just(res);
}
这会在Mono<List<GroupResponse>> response = ussClient.getGroups(request);
不调用订阅的情况下执行,但是除非我调用订阅,否则下面不会执行。
Flux<UserbaseEntityResponse.GroupPrincipal> f = Flux.fromIterable(res.getGroups())
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(groupPrincipal -> getResourcesForGroup((ImmutableGroupPrincipal)groupPrincipal, res.getUserbaseId()))
.sequential();
我可以在组装时间与订阅方面获得更多信息吗?
解决方案
“在您订阅之前什么都不会发生”并非在所有情况下都是正确的。Mono
发布者(或Flux
)将在三种情况下执行:
- 您订阅;
- 你阻止;
- 出版商很“热”。
请注意,上述场景都适用于整个反应链 - 即,如果我订阅发布者,则上游的所有内容(依赖于该发布者)也会执行。这就是为什么框架可以并且应该在需要时调用 subscribe,从而导致控制器中定义的反应链执行。
在您的情况下,它实际上是其中的第二个-您正在阻塞,这本质上是“订阅并等待结果”。通常阻塞的方法会被清楚地标记,但情况并非总是如此 - 在您的情况下,它是执行阻塞的toIterable()
方法:Flux
将此 Flux 转换为对
Iterator.next()
调用的惰性 Iterable 阻塞。
但是啊,你说,我没有打电话Iterator.next()
——什么给了?!
好吧,隐含地你是通过调用forEachRemaining()
:
默认实现的行为如下:
while (hasNext()) action.accept(next());
...并且根据上述规则,由于ussClient.getGroups(request)
位于此阻塞调用的上游,因此它会被执行。
推荐阅读
- c++ - 如何克服屏幕外和屏幕上帧缓冲区渲染之间的差异?
- c++ - 如何使用 Eclipse IDE 清屏
- typescript - 打字稿:import * as moment from 'moment' vs. import moment from 'moment'
- r - 如何在 Windows 上从 R 运行 .sh 脚本?
- excel - 如何使用 .Union 在 .SpecialCells 方法中使用 2 个“类型”参数
- mysql - AWS-RDS 最大允许数据包值无法更改
- shiny - 对反应事件的数据操作
- python - y 与 Altair 的双轴
- ruby-on-rails - Rails / Ruby如何始终显示小数精度
- android - 在我的手机上运行我的 android studio 应用程序时,它会运行但不会将其保存在我的手机上