首页 > 解决方案 > Reactor 框架与组装时间和订阅时间混淆(何时调用订阅)

问题描述

我实际上对组装时间和订阅时间感到困惑。我知道单声道是懒惰的,并且在订阅之前不会被执行。下面是一个方法。

    public Mono<UserbaseEntityResponse> getUserbaseDetailsForEntityId(String id) {
        GroupRequest request = ImmutableGroupRequest
                .builder()
                .cloudId(id)
                .build();

        Mono<List<GroupResponse>> response = ussClient.getGroups(request);
        List<UserbaseEntityResponse.GroupPrincipal> groups = new CopyOnWriteArrayList<>();


        response.flatMapIterable(elem -> elem)
                .toIterable().iterator().forEachRemaining(
                    groupResponse -> {
                        groupResponse.getResources().iterator().forEachRemaining(
                            resource -> {
                                groups.add(ImmutableGroupPrincipal
                                        .builder()
                                        .groupId(resource.getId())
                                        .name(resource.getDisplayName())
                                        .addAllUsers(convertMemebersToUsers(resource))
                                        .build());
                            }
                        );
                    }
                );
        log.debug("Response Object - " + groups.toString());
        ImmutableUserbaseEntityResponse res = ImmutableUserbaseEntityResponse
                .builder()
                .userbaseId(id)
                .addAllGroups(groups)
                .build();


        Flux<UserbaseEntityResponse.GroupPrincipal> f = Flux.fromIterable(res.getGroups())
        .parallel()
                .runOn(Schedulers.parallel())
                .doOnNext(groupPrincipal -> getResourcesForGroup((ImmutableGroupPrincipal)groupPrincipal, res.getUserbaseId()))
                .sequential();

        return Mono.just(res);
    }

这会在Mono<List<GroupResponse>> response = ussClient.getGroups(request);不调用订阅的情况下执行,但是除非我调用订阅,否则下面不会执行。

Flux<UserbaseEntityResponse.GroupPrincipal> f = Flux.fromIterable(res.getGroups())
        .parallel()
                .runOn(Schedulers.parallel())
                .doOnNext(groupPrincipal -> getResourcesForGroup((ImmutableGroupPrincipal)groupPrincipal, res.getUserbaseId()))
                .sequential();

我可以在组装时间与订阅方面获得更多信息吗?

标签: spring-webfluxreactorspring5

解决方案


“在您订阅之前什么都不会发生”并非在所有情况下都是正确的。Mono发布者(或Flux)将在三种情况下执行:

  • 您订阅;
  • 你阻止;
  • 出版商很“热”。

请注意,上述场景都适用于整个反应链 - 即,如果我订阅发布者,则上游的所有内容(依赖于该发布者)也会执行。这就是为什么框架可以并且应该在需要时调用 subscribe,从而导致控制器中定义的反应链执行。

在您的情况下,它实际上是其中的第二个-您正在阻塞,这本质上是“订阅并等待结果”。通常阻塞的方法会被清楚地标记,但情况并非总是如此 - 在您的情况下,它是执行阻塞的toIterable()方法:Flux

将此 Flux 转换为对Iterator.next()调用的惰性 Iterable 阻塞。

但是啊,你说,我没有打电话Iterator.next()——什么给了?!

好吧,隐含地你是通过调用forEachRemaining()

默认实现的行为如下:

 while (hasNext())
     action.accept(next());

...并且根据上述规则,由于ussClient.getGroups(request)位于此阻塞调用的上游,因此它会被执行。


推荐阅读