首页 > 解决方案 > Spring WebFlux:WebClient 结合了 2 Reactive RESTful Web Service

问题描述

我正在使用 Spring WebFlux 开发具有反应式支持的微服务应用程序。让我们看看,我有一个属于一个类别的问题列表和每个问题的选项列表。我将问题和选项分离为具有响应式支持的服务,并且我希望使用 Spring WebFlux 的 WebClient 将另一个服务组合在一起。当然,它也需要支持 Reactive。

问题ServiceImpl:

public Flux<Question> getQuestions(String categoryId) {
    WebClient client = WebClient
        .builder()
        .baseUrl(getServiceUrl())
        .build();

    WebClient.ResponseSpec responseSpec = client
        .get()
        .uri("/questions/" + categoryId)
        .retrieve();

    return responseSpec.bodyToFlux(Question.class);
}

选项服务实现:

public Flux<Option> getOptions(String questionId) {
    WebClient client = WebClient
            .builder()
            .baseUrl(getServiceUrl())
            .build();

        WebClient.ResponseSpec responseSpec = client
            .get()
            .uri("/options/" + questionId)
            .retrieve();

        return responseSpec.bodyToFlux(Option.class);
}

但我不知道如何以反应方式将问题与其选项结合起来。任何人都可以提出一些想法吗?

更新的解决方案:

我添加了一个名为 CompositeQuestion 的新类

@Data
@AllArgsConstructor 
public class CompositeQuestion {

    private String id;

    private String description;

    private String categoryId;

    private List<Option> options;

}

现在要获取问题的列表选项,我的代码如下:

Flux<CompositeQuestion> compositQuestion = questionsFromCoreQuestionService.flatMap(question ->
        optionService.getOptions(question.getId())
            .collectList()
            .map(options -> new CompositeQuestion(question.getId(), question.getDescription(), question.getCategoryId(), options)))
        .subscribeOn(Schedulers.elastic());

标签: javamicroservicesspring-webfluxproject-reactor

解决方案


假设您有一个如下所示的类:

@Value
public class QuestionOptions {
     private Question question;
     private List<Option> options;
}

@Value注释来自Lombok

您可以使用以下选项检索问题:

Flux<String> categoryIds = Flux.just("1", "2", "3");
Flux<QuestionOptions> questionOptions = 
    categoryIds.flatMap(categoryId -> 
         // retrieve questions for each category
         questionService.getQuestions(categoryId)
              // get options for each question 
              .flatMap(question -> optionService.getOptions(question.getId())
              .collectList()
              .map(optionList -> new QuestionOptions(question, optionList))
         ))
    .subscribeOn(Schedulers.elastic()); // retrieve each question on a different thread.

请注意,如果类别的顺序可能与您请求的顺序不同。如果这对您来说是一个交易破坏者,您可能会考虑使用concatMap()而不是flatMap(),尽管每个请求都会按顺序运行。


推荐阅读