首页 > 解决方案 > Spring中RESTful API上flux行为的解释

问题描述

我正在努力理解 Spring 的 Webflux 和反应式 API,并且希望能够解释我在使用注释而不是路由/处理程序声明 REST 端点时看到的通量行为差异。

我看到的是通量(流?)在通过注释定义的 REST 端点获取时正在执行,而不是从使用路由/处理程序语义定义的 REST 端点获取。

谁能解释观察到的行为的差异?下面提供的代码和控制台输出...

注意:DemoRequestHandler.getOddIntsMult()中的注释代码将导致通量执行并遍历包含的整数以查找/返回奇数值。我想我真正的问题是“为什么在一个实例中需要subscribe()而不是另一个?”

带注释的 REST 控制器...

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import static org.springframework.web.bind.annotation.RequestMethod.GET;

@RestController
public class DemoFluxController {

    @RequestMapping(method=GET, value="/v1/fluxMult")
    public Flux<Integer> getMultFlux() {
        System.out.println("DEBUG -> FluxController.getMultFlux()");
        return DemoFlux.getOddInts(DemoFlux.multIntFlux);
    }
}

用于返回具有奇数整数的通量的测试类...

import reactor.core.publisher.Flux;

public class DemoFlux {

    public static Flux<Integer> multIntFlux = Flux.range(1, 20);

    private static boolean isOdd(Integer intVal) {
        System.out.printf("DEBUG -> DemoFlux.isOdd( %d )%n", intVal);
        return intVal % 2 != 0;
    }

    public static Flux<Integer> getOddInts(Flux<Integer> intFlux) {
        return intFlux.filter(DemoFlux::isOdd);
    }
}

声明备用 REST 端点的路由器实现...

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

@Configuration
public class DemoRouter {

    @Bean
    public RouterFunction<ServerResponse> route(DemoRequestHandler requestHandler) {
        return RouterFunctions.route(RequestPredicates.GET("/v2/fluxMult")
                          .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), requestHandler::getOddIntsMult);
    }
}

与 REST 端点的路由关联的请求处理程序。

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

@Component
public class DemoRequestHandler {

    public Mono<ServerResponse> getOddIntsMult(ServerRequest request) {
        System.out.println("DEBYG -> DemoRequestHandler.getOddIntsMult()");

        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
            .body(BodyInserters.fromObject(DemoFlux.getOddInts(DemoFlux.multIntFlux)));
//                .body(BodyInserters.fromObject(DemoFlux.getOddInts(DemoFlux.multIntFlux).subscribe()));
    }
}

运行的控制台输出。请注意不同的行为,具体取决于访问的 REST 端点...

DEBUG -> DemoFluxController.getMultFlux()
DEBUG -> DemoFlux.getOddInts()
DEBUG -> DemoFlux.isOdd( 1 )
DEBUG -> DemoFlux.isOdd( 2 )
DEBUG -> DemoFlux.isOdd( 3 )
DEBUG -> DemoFlux.isOdd( 4 )
DEBUG -> DemoFlux.isOdd( 5 )
DEBUG -> DemoFlux.isOdd( 6 )
DEBUG -> DemoFlux.isOdd( 7 )
DEBUG -> DemoFlux.isOdd( 8 )
DEBUG -> DemoFlux.isOdd( 9 )
DEBUG -> DemoFlux.isOdd( 10 )
DEBUG -> DemoFlux.isOdd( 11 )
DEBUG -> DemoFlux.isOdd( 12 )
DEBUG -> DemoFlux.isOdd( 13 )
DEBUG -> DemoFlux.isOdd( 14 )
DEBUG -> DemoFlux.isOdd( 15 )
DEBUG -> DemoFlux.isOdd( 16 )
DEBUG -> DemoFlux.isOdd( 17 )
DEBUG -> DemoFlux.isOdd( 18 )
DEBUG -> DemoFlux.isOdd( 19 )
DEBUG -> DemoFlux.isOdd( 20 )

DEBUG -> DemoRequestHandler.getOddIntsMult()
DEBUG -> DemoFlux.getOddInts()

标签: restspring-bootspring-webflux

解决方案


它们的行为不同,因为您没有对它们进行相同的编码。

人们在使用 webflux/reactive 编程时最常犯的错误之一是他们在应用程序中间订阅,因为他们想要或需要某物的具体价值。

WebFlux/反应式编程基本上是我们声明一个我们希望在有人订阅时发生的事件链。在这种情况下,它是客户端订阅的时候。所以你能感觉到有什么不对劲真是太好了,因为有!

你的问题在这里:

public Mono<ServerResponse> getOddIntsMult(ServerRequest request) {
    System.out.println("DEBYG -> DemoRequestHandler.getOddIntsMult()");

    return ServerResponse.ok()
        .contentType(MediaType.APPLICATION_JSON)
        .body(BodyInserters.fromObject(              // This body inserter here
            DemoFlux.getOddInts(DemoFlux.multIntFlux)));
}

发生的情况是 BodyInserter 需要一个具体的值,如果你查看它的源代码,你会看到一个 body inserter 将 body 包装在一个单声道中。所以你正在做的是将Fluxa包裹起来,Mono所以你基本上得到的是 a Mono<Flux<Integer>>

在您使用 webflux 工作了一段时间后,您可以知道当您得到类似以下内容时就是这种情况:

{
    "disposed": true,
    "scanAvailable": true
}

然后你知道你得到了一个包裹的单声道/通量,你在某个地方错过了一个平面图。

如果您重写它并删除BodyInserter它,它将按预期工作。

public Mono<ServerResponse> getOddIntsMult(ServerRequest request) {
    System.out.println("DEBYG -> DemoRequestHandler.getOddIntsMult()");

    return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
            .body(DemoFlux.getOddInts(DemoFlux.multIntFlux), Integer.class);                
}

所以没有区别,你@RequestMapping(method=GET, value="/v1/fluxMult")不使用,DemoRequestHandler因此你得到不同的结果。


推荐阅读