首页 > 解决方案 > 使用spring响应式webClient面临问题“WebClientRequestException:挂起的获取队列已达到其最大大小1000”

问题描述

我正在运行微服务 API 的负载,这涉及使用 Spring Reactive Webclient 调用其他微服务 API。我正在使用 Postman runner 选项卡来测试它。

首先,我以 1500 次迭代运行负载,每个请求都调用第二个微服务,一切都按预期工作。但是当我运行 5000 次迭代的负载时,第二个微服务被调用了 3500 次,并且由于问题 1500 次调用失败

WebClientRequestException:挂起的获取队列已达到其最大大小 1000

使用 org.springframework.web.reactive.function.client.WebClient 和默认配置,下面是代码片段。

 private WebClient webClient;

    @PostConstruct
    public void init() {
        this.webClient = WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE,  MediaType.APPLICATION_JSON_VALUE)
                .build();
    }

可以做些什么来避免这种情况?

我正在使用最新的 spring-boot-starter-parent 依赖项(版本 2.5.3)和 spring-webflux-5.3.9.jar jar。

日志:

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
        at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)
        at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67)
        at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:375)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:296)
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:885)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
        at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
        at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

**Caused by: org.springframework.web.reactive.function.client.WebClientRequestException: Pending acquire queue has reached its maximum size of 1000; nested exception is reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000**
        at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
        |_ checkpoint ⇢ Request to POST http://172.20.0.2:3130/v1/login/mobile [DefaultWebClient]
Stack trace:
                at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
                at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)
                at reactor.core.publisher.Mono.subscribe(Mono.java:4338)
                at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
                at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
                at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:414)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
                at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
                at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
                at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$ClientTransportSubscriber.onError(HttpClientConnect.java:304)
                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
                at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onError(DefaultPooledConnectionProvider.java:172)
                at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:444)
                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:543)
                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:266)
                at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:399)
                at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212)
                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:674)
                at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:137)
                at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268)
                at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
                at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)
                at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)
                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:271)
                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
                at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.resubscribe(FluxRetryWhen.java:216)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onNext(FluxRetryWhen.java:269)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:282)
                at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:861)
                at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
                at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
                at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
                at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
                at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
                at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
                at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
                at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
                at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
                at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                at java.base/java.lang.Thread.run(Thread.java:829)
**Caused by: reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000
        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:543)**
        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:266)
        at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:399)
        at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212)
        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:674)
        at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:137)
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268)
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)
        at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)

标签: spring-webfluxspring-webclientreactor-netty

解决方案


WebClient 需要一个 HTTP 客户端库来执行请求,默认情况下它使用 Reactor Netty。

引用Reactor-netty 参考文档

默认情况下,Reactor Netty 客户端使用“固定”连接池,其中 500 作为活动通道的最大数量,1000 作为允许保持在挂起状态的进一步通道获取尝试的最大数量(对于其余配置,请检查系统属性或下面的构建器配置)。这意味着如果有人尝试获取一个频道,只要创建的频道少于 500 个并且由池管理,则该实现会创建一个新频道。当达到池中的最大通道数时,最多 1000 次新的获取通道的尝试会被延迟(挂起),直到通道再次返回池中,并且进一步的尝试会因错误而被拒绝。

您看到的是您正在积极使用连接池中的所有 500 个连接,并且您已经用 1000 个待处理请求填充了“待处理”队列。

你有 2 个选项来解决这个问题

垂直缩放

增加连接池大小和/或获取队列长度

ConnectionProvider connectionProvider = ConnectionProvider.builder("myConnectionPool")
        .maxConnections(<your_desired_max_connections>)
        .pendingAcquireMaxCount(<your_desired_pending_queue_size>)
        .build();
ReactorClientHttpConnector clientHttpConnector = new ReactorClientHttpConnector(HttpClient.create(connectionProvider));
WebClient.builder()
        .clientConnector(clientHttpConnector)
        .build();

水平缩放

创建应用程序的其他实例并负载平衡实例之间的 api 调用。

弹簧参考文档

附加说明:

在计算连接池的大小时,值得考虑下游 api 调用的延迟。一个好的起点是

connection_pool_size = tps * 下游_api_latency

tps(每秒事务数)


推荐阅读