首页 > 解决方案 > 使用 Spring MVC 进行异步流式传输

问题描述

这是使用 WebClient 进行 OAuth2 客户端集成的 Spring Web MVC 应用程序的一部分,其目的是使用适当的授权标头代理一些进出资源服务器的请求,将数据从异步Flux<DataBuffer>传输到同步OutputStream。我这样做是因为我找不到让 Spring 神奇地将 aMono<ResponseEntity<Flux<DataBuffer>>>作为返回类型处理的方法。我的意思是它做了一些事情,但不是我想要的,即流式传输原始二进制数据。

@RestController
public class ResourcePassthroughController {

    private final WebClient webClient;
    private final AntPathMatcher pathMatcher = new AntPathMatcher();
    private final String resourceServerUri;
    private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();

    public ResourcePassthroughController(
            WebClient webClient,
            @Value("${app.urls.my-client-resource-server}") String uri) {
        this.webClient = webClient;
        this.resourceServerUri = uri;
    }

    @RequestMapping(value = "/resource/**")
    // public Mono<ResponseEntity<Flux<DataBuffer>>> getResource(
    public ResponseEntity<StreamingResponseBody> getResource(
            @RegisteredOAuth2AuthorizedClient("my-client") OAuth2AuthorizedClient authorizedClient,
            HttpServletRequest req) throws IOException {
        String resourcePath = "/" + pathMatcher.extractPathWithinPattern("/resource/**", req.getServletPath()) // req.getPathInfo() // ?
                + (req.getPathInfo() != null ? req.getPathInfo() : "")
                + (req.getQueryString() != null ? "?" + req.getQueryString() : "");

        // Mono<ResponseEntity<Flux<DataBuffer>>> mono = this
        //     .webClient
        //     .method(HttpMethod.valueOf(req.getMethod()))
        //     .uri(this.resourceServerUri + resourcePath)
        //     .attributes(oauth2AuthorizedClient(authorizedClient))
        //         // .accept(MediaType.APPLICATION_JSON)
        //     .retrieve()
        //     .toEntityFlux(DataBuffer.class);
        // return mono;

        ResponseEntity<Flux<DataBuffer>> responseEntity = this
            .webClient
            .method(HttpMethod.valueOf(req.getMethod()))
            .uri(this.resourceServerUri + resourcePath)
            .body(BodyInserters.fromDataBuffers(DataBufferUtils.read(new InputStreamResource(req.getInputStream()), bufferFactory, 4096)))
            .attributes(oauth2AuthorizedClient(authorizedClient))
            .retrieve()
            .toEntityFlux(DataBuffer.class)
            .block()
            ;
        responseEntity
            .getBody()
            // .timeout(Duration.ofSeconds(90), Flux.empty());
            .timeout(Duration.ofSeconds(90)) // stop reading after 90 seconds and proapagate TimeoutException ?
            ;

        HttpHeaders headers = responseEntity.getHeaders();

        StreamingResponseBody streamer = (outputStream) -> {
            Flux<DataBuffer> flux = DataBufferUtils
            .write(responseEntity.getBody(), outputStream)
            .publish()
            .autoConnect(2); // expect two subscribers (.subscribe and .blockLast)
            flux.subscribe(DataBufferUtils.releaseConsumer()); // resolve MEMORY LEAK
            // flux.timeout(Duration.ofSeconds(120), Flux.error(() -> new IOException("Flux proxy write timeout"))); // stop writing after 120 seconds and propagate exception
            flux.timeout(Duration.ofSeconds(120)); // stop writing after 120 seconds and propagate TimeoutException
            try {
                flux.blockLast(); // once this call returns, the streamer function will follow, so Spring can then close the outputStream it has given us
            } catch (RuntimeException ex) {
                Throwable cause = ex.getCause();
                if (cause instanceof TimeoutException) {
                    throw new IOException("Flux proxy timeout",ex.getCause());
                }
                throw ex;
            }
        };
        return ResponseEntity.ok().headers(headers)
                // .contentLength(headers.getContentLength())
                // .contentType(headers.getContentType())
                .body(streamer);
    }
}

然而,当我切换到 WebFlux 时,Netty 恰好在流式传输方面做得很好:

@RestController
public class ResourcePassthroughController {

    private final WebClient webClient;

    @Value("${app.main-oauth2-client-registration}")
    String oauth2ClientRegistration;

    public ResourcePassthroughController(WebClient webClient) {
        this.webClient = webClient;
    }
    @RequestMapping(value = "/resource/{*path}")
    public Mono<ResponseEntity<Flux<DataBuffer>>> getResource( // this return type does NOT work with servlet, may be a missing decoder
        // @RegisteredOAuth2AuthorizedClient("${app.main-oauth2-client-id}") OAuth2AuthorizedClient authorizedClient,
        ServerHttpRequest request,
        @PathVariable("path") String resourcePath,
        @RequestParam MultiValueMap<String,String> queryMap) {

        return this.webClient
            .method(request.getMethod())
            .uri(builder -> builder
                 .path(resourcePath) // append to path defined in WebClientConfig
                 .queryParams(queryMap)
                 .build()
                 )
            .body(BodyInserters.fromDataBuffers(request.getBody()))
            .attributes(clientRegistrationId(oauth2ClientRegistration)) // OAuth2AuthorizedClient extracted from current logged-in user
            .retrieve()
            .toEntityFlux(DataBuffer.class)
            .timeout(Duration.ofSeconds(90))
            ;
    }
}

但是现在我真的很想继续使用 servlet,并以某种方式Flux在我DataBuffer的 s(也是ByteBuffers)周围正确地制作它的异步部分。有没有一个很好的方法来做到这一点?

标签: springspring-mvcspring-webfluxspring-oauth2spring-webclient

解决方案


推荐阅读