首页 > 解决方案 > WebFlux:Flux.reduce() 是否传播 Mono.error()?

问题描述

爪哇 11

春季启动 2.2.6

WebFlux Netty 应用程序在 WebClient REST 调用上使用 Flux.parallell 流:

private Mono<MySingleResponse> sendDocument(Document doc) {
    return documentWebClient
            .post()
            .uri("/upload")
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(doc)
            .retrieve()
            .onStatus(HttpStatus::isError, clientResponse ->  Mono.error(new MyServerException()))
            .bodyToMono(MySingleResponse.class);
}

设置在链上:

            .then(doStuff())
            .then(Flux.fromIterable(documents)
                            .parallel()
                            .runOn(Schedulers.elastic())
                            .flatMap(document -> sendDocument(document))
                            .sequential()
                            .reduce(new MyCompleteResponse(), myReduce()))
            .then(doOtherStuff());
}

减白:

BiFunction<MyCompleteResponse, MySingleResponse, MyCompleteResponse > myReduce () {
    return (o2, o1) -> {
        List< MySingleResponse> singleResponses = new ArrayList<>(o2.getSingleResponses());
        responses.add(o1);
        return new MyCompleteResponse(singleResponses);
    };
}

如果 WebClient 收到错误响应,reduce 是否使用 Mono.error() 响应或收集所有响应?

换句话说:我是否会丢失有关其他 REST 请求的所有信息?

标签: javareactive-programmingspring-webfluxreduce

解决方案


推荐阅读