首页 > 解决方案 > SpringBoot中对下游客户端的流式响应

问题描述

我有一个控制器代理 api 端点,它接收针对不同服务的不同请求有效负载。该控制器验证有效负载并根据某些规则添加少量标头。在当前上下文中,我不想解析从上游服务接收到的响应。代理方法应该简单地将响应流式传输到下游客户端,以便在处理大型响应负载时可以很好地扩展而不会出现任何内存问题。

我已经实现了这样的方法:

suspend fun proxyRequest(
        url: String,
        request: ServerHttpRequest,
        customHeaders: HttpHeaders = HttpHeaders.EMPTY,
    ): ResponseEntity<String>? {
        val modifiedReqHeaders = getHeadersWithoutOrigin(request, customHeaders)

        val uri = URI.create(url)
        val webClient = proxyClient.method(request.method!!)
            .uri(uri)
            .body(request.body)
        modifiedReqHeaders.forEach {
            val list = it.value.iterator().asSequence().toList()
            val ar: Array<String> = list.toTypedArray()

            @Suppress("SpreadOperator")
            webClient.header(it.key, *ar)
        }

        return webClient.exchangeToMono { res ->
            res.bodyToMono(String::class.java).map { b -> ResponseEntity.status(res.statusCode()).body(b) }
        }.awaitFirstOrNull()
    }

但这似乎不是流媒体。当我尝试下载大文件时,它抱怨未能保存大数据缓冲区。有人可以帮助我编写反应式流式方法吗?

这就是我最终所做的。

suspend fun proxyRequest(
        url: String,
        request: ServerHttpRequest,
        response: ServerHttpResponse,
        customHeaders: HttpHeaders = HttpHeaders.EMPTY,
    ): Void? {
        val modifiedReqHeaders = getHeadersWithoutOrigin(request, customHeaders)

        val uri = URI.create(url)
        val webClient = proxyClient.method(request.method!!)
            .uri(uri)
            .body(request.body)
        modifiedReqHeaders.forEach {
            val list = it.value.iterator().asSequence().toList()
            val ar: Array<String> = list.toTypedArray()

            @Suppress("SpreadOperator")
            webClient.header(it.key, *ar)
        }

        val respEntity = webClient
            .retrieve()
            .toEntityFlux<DataBuffer>()
            .awaitSingle()
        response.apply {
            headers.putAll(respEntity.headers)
            statusCode = respEntity.statusCode
        }
        return response.writeWith(respEntity.body ?: Flux.empty()).awaitFirstOrNull()
    }

让我知道这是否真的向下游发送数据并刷新?

标签: spring-bootkotlinreactive-programmingspring-webflux

解决方案


您的第一个代码片段因内存问题而失败,因为它在内存中将整个响应主体缓冲为 aString并在之后转发它。如果响应非常大,您可能会填满整个可用内存。

第二种方法也失败了,因为Flux<DataBuffer>您只返回第一个,而不是返回整个响应(因此整个响应作为缓冲区)。这会失败,因为响应不完整。

即使您设法解决了这个特定问题,还有许多其他事项需要注意:

  • 似乎您没有返回原始响应标头,从而有效地更改了响应内容类型
  • 您不应该转发所有传入的响应标头,因为其中一些实际上取决于服务器(例如传输编码)
  • 与安全相关的请求/响应标头会发生什么?
  • 您如何处理跟踪和指标?

您可以查看Spring Cloud Gateway项目,该项目处理了很多这些细微之处,并允许您操纵请求/响应。


推荐阅读