首页 > 解决方案 > Spring Web Flux - WebClient - exchange - block - 如何释放连接到连接池?

问题描述

当前的:

我正在使用spring-webflux-5.2.8.RELEASE,这工作“很好”:

httpStatus = webClient
    .post()
    .uri(someUri)
    .headers(someHeaders)
    .bodyValue(someBody)
    .exchange()
    .map(ClientResponse::statusCode)
    .timeout(someTimeout)
    .doOnError(doSomething())
    .onErrorResume(ex -> Mono.empty())
    .block();

问题:

当返回错误时,没有问题,因为连接被破坏并且没有放回连接池中:

调试 rnresources.PooledConnectionProvider - [id: 0xa23f78ad, L:/127.0.0.1:7524 !R:localhost/127.0.0.1:8443] 通道关闭,现在有 0 个活动连接和 0 个非活动连接

但是当我得到一个成功的响应时,下一个帖子将失败/超时:

java.util.concurrent.TimeoutException:在“map”中 10000 毫秒内没有观察到任何项目或终端信号(并且没有配置回退)

由于我需要进行故障排除,我使用了一个只有 1 个连接的修复连接池,如下所示:

@Bean
public WebClient.Builder webClientBuilder(){
    
    HttpClient httpClient = HttpClient.create(ConnectionProvider.create("pool", 1));
    
    return WebClient.builder()
        .clientConnector(new ReactorClientHttpConnector(httpClient));
}

我猜(我可能是错的,因为我对 webclient 和响应式世界完全陌生)问题是它在成功获得响应后没有释放连接,我认为这是由于这个原因。

当通过 WebClient exchange() 方法使用 ClientResponse 时,您必须确保主体被消耗或释放......

什么已经尝试过:

我试着像这样做一个 releaseBody() (但这不起作用,因为下一篇文章仍然失败):

.map(clientResponse -> { 
    HttpStatus statusCode = clientResponse.statusCode();
    clientResponse.releaseBody();
    return statusCode;
})

PS我需要使用block()。这无法更改,因为我需要响应才能继续前进,我只需要获取状态代码。我正在使用 WebClient,因为我在某处读到了将弃用其余模板以支持 ... WebClient。我希望有人能帮帮忙。

更新1:

我启用了指标,并且确实没有释放连接:

reactor_netty_connection_provider_fixedPool_total_connections{id="1591603494",remote_address="localhost:8443",} 1.0 reactor_netty_connection_provider_fixedPool_active_connections{id="1591603494",remote_address="localhost:8443",} 1.0

更新 2

发现这个:https://github.com/spring-projects/spring-framework/issues/20474

更新 3

我尝试使用默认连接数(即 500),我注意到在我发布每个帖子后活动连接数不断增加:-(

[reactor-http-nio-2] 调试 rnresources.PooledConnectionProvider - [id: 0x2316e048, L:/127.0.0.1:32787 - R:localhost/127.0.0.1:8443] 通道已连接,现在有 7 个活动连接和 0 个非活动连接

标签: spring-webfluxreactor-netty

解决方案


这对我有用:

HttpStatus httpStatus = null;
Mono<HttpStatus> monoHttpStatus  = null;
:           
WebClient webClient = xxx.getWebClient();
:       
try {
    monoHttpStatus  = webClient
        .post()
        .uri(someUri)
        .headers(someHeaders)
        .bodyValue(someBody)
        .exchange()
        .map(clientResponse -> 
            clientResponse.releaseBody().thenReturn(clientResponse.statusCode()))
        .timeout(someTimeout)
        .doOnError(Exception.class, e -> logger.error("An exception has occurred: ", e))
        .onErrorResume(ex -> Mono.empty())
        .block();
                
        if(monoHttpStatus != null)
            httpStatus = monoHttpStatus.block();            
}
:

//SomeOtherClass
@Autowired
private WebClient.Builder webClientBuilder;
public WebClient getWebClient() {
        
    :
    
    webClient= webClientBuilder.baseUrl(baseUrl)
                   .build();
    :   
    return webClient;
}

我现在在帖子完成后看到这个:

DEBUG reactor.util.Loggers$Slf4JLogger.debug[249] [reactor-http-nio-1] [id: 0x6b0568a3, L:/127.0.0.1:10168 - R:localhost/127.0.0.1:8443] 发布频道
DEBUG reactor .util.Loggers$Slf4JLogger.debug[254] [reactor-http-nio-1] [id: 0x6b0568a3, L:/127.0.0.1:10168 - R:localhost/127.0.0.1:8443] 通道已清理,现在 0 处于活动状态连接和 1 个非活动连接


推荐阅读