首页 > 解决方案 > 链式反应组件调用

问题描述

我有以下对象的列表,其方法返回反应类型Mono<?>

interface GuyWithReactiveReturnTypeMethod {

    Mono<String> execute();
}

class ReactiveGuysInvocator {

    Mono<String> executeAllGuys(List<GuyWithReactiveReturnTypeMethod> guysToInvoke) {
        ???
    }
}

而且我需要一一调用所有的人(n''''''''''''''''''''''' 结果n+1''''''参数)结果,但我不确定如何遍历这样的列表。我想flatMap在 while 循环中 ing next 人:

public interface GuyWithReactiveReturnTypeMethod {

    Mono<String> execute(String string);
}

class ReactiveGuysInvocator {

    Mono<String> executeAllGuys(List<GuyWithReactiveReturnTypeMethod> guysToExecute) {
        ListIterator<GuyWithReactiveReturnTypeMethod> iterator = guysToExecute.listIterator();
        Mono<String> currentResult = Mono.just("start");
        while (iterator.hasNext()) {
            GuyWithReactiveReturnTypeMethod guyToInvoke = iterator.next();
            currentResult = currentResult.flatMap(guyToInvoke::execute)
                    .doOnNext(object -> System.out.println("Executed!"))
                    .doOnError(error -> System.out.println("Error"));
        }
        return currentResult;
    }
}

但这种做法似乎完全不正确。有谁知道我怎么能实现这样的东西?

标签: javaspringreactive-programmingspring-webflux

解决方案


更新:flatMap很容易被滥用。确保您asynchronous在使用flatMap. 大多数情况下,在我看来,你可以用最少的Mono.just.

Flatmap是你必须对你提供的约束做些什么。

executeAllGuys(Arrays.asList(new GuyWithReactiveReturnTypeMethod[] {
        (s)->Mono.just(s+"1"), 
        (s)->Mono.just(s+"2"), 
        (s)->Mono.just(s+"3")})) 
.subscribe(System.out::println);

Mono<String> executeAllGuys(List<GuyWithReactiveReturnTypeMethod> guysToExecute) {
    // your flow is starting here
    Mono<String> stringMono = Mono.just("start");
    for ( GuyWithReactiveReturnTypeMethod guyToInvoke: guysToExecute) {
        stringMono = stringMono.flatMap(guyToInvoke::execute);
    }
    return stringMono;
}

看看所有这些Mono.just电话。为什么要创建 N+1 流来完成这项工作?真正的问题是每次执行接口方法时都会创建一个新流程。Flatmap停止当前流程并使用该flatMap方法返回的发布者启动新流程。试着像溪流一样思考reactive和对待整个业务。flatMap中没有Streams。反应式执行应仅在单个流上完成。

AMono<String> execute(String string)不是反应组件。它是一个反应式生产者。AMono<String> execute(Mono<String> string)是一个反应组件。

  1. 通过输入并返回.使您的界面更具反应性。您的应用程序在每一步都进行转换。这是“链接反应组件”。MonoMonomap

    executeAllGuys(Arrays.asList(new GuyWithReactiveReturnTypeMethod[] { (s)->s.map(str->str+"1"), (s)->s.map(str->str+"2"), (s) ->s.map(str->str+"3")})) .subscribe(System.out::println);

    Mono executeAllGuys(List guyToExecute) { // 你的流程从这里开始 Mono stringMono = Mono.just("start"); for ( GuyWithReactiveReturnTypeMethod guyToInvoke: guyToExecute) { stringMono = guyToInvoke.execute(stringMono); } 返回字符串单声道;}

    接口 GuyWithReactiveReturnTypeMethod { 单声道执行(单声道字符串);}

  2. 通过使用 a而不是列表,使您的界面不那么被动,但使您的应用程序更具反应性。Flux然后,您将不得不使用reduce将 Flux 转换为 Mono。您的应用程序正在执行 Map/Reduce 功能。我认为 a 不能Flux保证流中元素的执行顺序,但它可以更有效地执行AllGuys。

    // 你的流程从这里开始 executeAllGuys(Flux.just( (s)->s+"1", (s)->s+"2", (s)->s+"3")) .subscribe(System.out ::println);

    Mono executeAllGuys(Flux guyToExecute) { return guyToExecute.reduce("start", (str, guyToInvoke)->guyToInvoke.execute(str)); } 接口 GuyWithReactiveReturnTypeMethod { 字符串执行(字符串字符串);}

参考:反应式编程:Spring WebFlux:如何构建微服务调用链?


推荐阅读