spring - 使用 Spring MVC 进行异步流式传输
问题描述
这是使用 WebClient 进行 OAuth2 客户端集成的 Spring Web MVC 应用程序的一部分,其目的是使用适当的授权标头代理一些进出资源服务器的请求,将数据从异步Flux<DataBuffer>
传输到同步OutputStream
。我这样做是因为我找不到让 Spring 神奇地将 aMono<ResponseEntity<Flux<DataBuffer>>>
作为返回类型处理的方法。我的意思是它做了一些事情,但不是我想要的,即流式传输原始二进制数据。
@RestController
public class ResourcePassthroughController {
private final WebClient webClient;
private final AntPathMatcher pathMatcher = new AntPathMatcher();
private final String resourceServerUri;
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
public ResourcePassthroughController(
WebClient webClient,
@Value("${app.urls.my-client-resource-server}") String uri) {
this.webClient = webClient;
this.resourceServerUri = uri;
}
@RequestMapping(value = "/resource/**")
// public Mono<ResponseEntity<Flux<DataBuffer>>> getResource(
public ResponseEntity<StreamingResponseBody> getResource(
@RegisteredOAuth2AuthorizedClient("my-client") OAuth2AuthorizedClient authorizedClient,
HttpServletRequest req) throws IOException {
String resourcePath = "/" + pathMatcher.extractPathWithinPattern("/resource/**", req.getServletPath()) // req.getPathInfo() // ?
+ (req.getPathInfo() != null ? req.getPathInfo() : "")
+ (req.getQueryString() != null ? "?" + req.getQueryString() : "");
// Mono<ResponseEntity<Flux<DataBuffer>>> mono = this
// .webClient
// .method(HttpMethod.valueOf(req.getMethod()))
// .uri(this.resourceServerUri + resourcePath)
// .attributes(oauth2AuthorizedClient(authorizedClient))
// // .accept(MediaType.APPLICATION_JSON)
// .retrieve()
// .toEntityFlux(DataBuffer.class);
// return mono;
ResponseEntity<Flux<DataBuffer>> responseEntity = this
.webClient
.method(HttpMethod.valueOf(req.getMethod()))
.uri(this.resourceServerUri + resourcePath)
.body(BodyInserters.fromDataBuffers(DataBufferUtils.read(new InputStreamResource(req.getInputStream()), bufferFactory, 4096)))
.attributes(oauth2AuthorizedClient(authorizedClient))
.retrieve()
.toEntityFlux(DataBuffer.class)
.block()
;
responseEntity
.getBody()
// .timeout(Duration.ofSeconds(90), Flux.empty());
.timeout(Duration.ofSeconds(90)) // stop reading after 90 seconds and proapagate TimeoutException ?
;
HttpHeaders headers = responseEntity.getHeaders();
StreamingResponseBody streamer = (outputStream) -> {
Flux<DataBuffer> flux = DataBufferUtils
.write(responseEntity.getBody(), outputStream)
.publish()
.autoConnect(2); // expect two subscribers (.subscribe and .blockLast)
flux.subscribe(DataBufferUtils.releaseConsumer()); // resolve MEMORY LEAK
// flux.timeout(Duration.ofSeconds(120), Flux.error(() -> new IOException("Flux proxy write timeout"))); // stop writing after 120 seconds and propagate exception
flux.timeout(Duration.ofSeconds(120)); // stop writing after 120 seconds and propagate TimeoutException
try {
flux.blockLast(); // once this call returns, the streamer function will follow, so Spring can then close the outputStream it has given us
} catch (RuntimeException ex) {
Throwable cause = ex.getCause();
if (cause instanceof TimeoutException) {
throw new IOException("Flux proxy timeout",ex.getCause());
}
throw ex;
}
};
return ResponseEntity.ok().headers(headers)
// .contentLength(headers.getContentLength())
// .contentType(headers.getContentType())
.body(streamer);
}
}
然而,当我切换到 WebFlux 时,Netty 恰好在流式传输方面做得很好:
@RestController
public class ResourcePassthroughController {
private final WebClient webClient;
@Value("${app.main-oauth2-client-registration}")
String oauth2ClientRegistration;
public ResourcePassthroughController(WebClient webClient) {
this.webClient = webClient;
}
@RequestMapping(value = "/resource/{*path}")
public Mono<ResponseEntity<Flux<DataBuffer>>> getResource( // this return type does NOT work with servlet, may be a missing decoder
// @RegisteredOAuth2AuthorizedClient("${app.main-oauth2-client-id}") OAuth2AuthorizedClient authorizedClient,
ServerHttpRequest request,
@PathVariable("path") String resourcePath,
@RequestParam MultiValueMap<String,String> queryMap) {
return this.webClient
.method(request.getMethod())
.uri(builder -> builder
.path(resourcePath) // append to path defined in WebClientConfig
.queryParams(queryMap)
.build()
)
.body(BodyInserters.fromDataBuffers(request.getBody()))
.attributes(clientRegistrationId(oauth2ClientRegistration)) // OAuth2AuthorizedClient extracted from current logged-in user
.retrieve()
.toEntityFlux(DataBuffer.class)
.timeout(Duration.ofSeconds(90))
;
}
}
但是现在我真的很想继续使用 servlet,并以某种方式Flux
在我DataBuffer
的 s(也是ByteBuffer
s)周围正确地制作它的异步部分。有没有一个很好的方法来做到这一点?
解决方案
推荐阅读
- google-bigquery - 设置 BigQuery 计划查询时,如何避免“创建计划查询时出错:调用者没有权限”?
- logstash - 找不到 Logstash JDBC 类
- c++ - c ++ lambda已使用但未初始化
- ubuntu - `/sys/fs/cgroup/pids/user.slice/user-1000.slice/pids.max`在重新连接到机器(ssh)后改变
- python - 将复杂条件分解为子条件 - Python
- javascript - 在 UI 上解析上传的 CSV 文件(约 8GB)时将非常大的 JSON 对象存储在内存中
- python - 在snakemake中缺少所有规则的输入文件
- shell - jq 比较两个文件并以文本格式输出差异
- sql-server - .NET Core Entity Framework 让用户运行存储过程
- java - 如何以日期类型的 yyyy-MM-dd 格式转换日期?