首页 > 解决方案 > Spring 5 WebClient 抛出 java.util.concurrent.RejectedExecutionException

问题描述

我正在使用 Spring WebClient 与 Java 11/Spring Boot 2.2.6 Web 应用程序中的其他 Web 服务进行通信。

以下是我的 WebClient 的 bean 配置:

@Bean
public WebClient.Builder webClientBuilder() {
    String connectionProviderName = "customConnectionProvider";
    int maxConnections = 1000;
    int acquireTimeout = 45;
    ConnectionProvider connectionProvider = ConnectionProvider.builder(connectionProviderName)
            .maxConnections(maxConnections)
            .pendingAcquireTimeout(Duration.ofSeconds(acquireTimeout))
            .build();

    HttpClient httpClient = HttpClient.create(connectionProvider);
    return WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient));
}

private Mono<ClientHttpResponse> getWebClientFactory(HttpMethod httpMethod, URI uri, Function<? super ClientHttpRequest, Mono<Void>> monoFunction) {
    TcpClient tcpClient = TcpClient
            .create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
            .doOnConnected(connection -> {
                connection.addHandlerLast(new ReadTimeoutHandler(300000, TimeUnit.MILLISECONDS));
                connection.addHandlerLast(new WriteTimeoutHandler(300000, TimeUnit.MILLISECONDS));
            });
    return new ReactorClientHttpConnector(HttpClient.from(tcpClient)).connect(httpMethod, uri, monoFunction);
}

我使用 WebClient 进行阻塞调用,如下所示,因为我的应用程序的其余部分是阻塞的(基于 servlet)

webClient.post().uri("http://localhost:8080/api", iceId).bodyValue(request).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(String.class).block();

问题:当应用程序服务的请求数量较少时,这种方法似乎没有任何问题。但是当每秒超过 20 个请求时,上面的 webClient 方法调用开始为某些调用抛出 java.util.concurrent.RejectedExecutionException,并在一段时间后再次开始工作。

堆栈跟踪如下所示:

java.util.concurrent.RejectedExecutionException: event executor terminated at
io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926) ~[netty-common-4.1.48.Final.jar!/:4.1.48.Final] at
io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353) ~[netty-common-4.1.48.Final.jar!/:4.1.48.Final] at
io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346) ~[netty-common-4.1.48.Final.jar!/:4.1.48.Final] at
io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828) ~[netty-common-4.1.48.Final.jar!/:4.1.48.Final] at
io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:818) ~[netty-common-4.1.48.Final.jar!/:4.1.48.Final] at
io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:471) ~[netty-transport-4.1.48.Final.jar!/:4.1.48.Final] at
io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87) ~[netty-transport-4.1.48.Final.jar!/:4.1.48.Final] at
io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81) ~[netty-transport-4.1.48.Final.jar!/:4.1.48.Final] at
reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.6.RELEASE.jar!/:0.9.6.RELEASE] at
io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323) ~[netty-transport-4.1.48.Final.jar!/:4.1.48.Final] at
io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) ~[netty-transport-4.1.48.Final.jar!/:4.1.48.Final] at
io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) ~[netty-transport-4.1.48.Final.jar!/:4.1.48.Final] at
reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator.lambda$connectChannel$0(PooledConnectionProvider.java:224) ~[reactor-netty-0.9.6.RELEASE.jar!/:0.9.6.RELEASE] at
reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.core.publisher.Mono.subscribe(Mono.java:4210) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.core.publisher.Mono.subscribeWith(Mono.java:4316) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.core.publisher.Mono.subscribe(Mono.java:4182) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.core.publisher.Mono.subscribe(Mono.java:4118) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.netty.internal.shaded.reactor.pool.SimplePool.drainLoop(SimplePool.java:201) ~[reactor-netty-0.9.6.RELEASE.jar!/:0.9.6.RELEASE] at
reactor.netty.internal.shaded.reactor.pool.SimplePool.drain(SimplePool.java:172) ~[reactor-netty-0.9.6.RELEASE.jar!/:0.9.6.RELEASE] at
reactor.netty.internal.shaded.reactor.pool.SimplePool.doAcquire(SimplePool.java:132) ~[reactor-netty-0.9.6.RELEASE.jar!/:0.9.6.RELEASE] at
reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:351) ~[reactor-netty-0.9.6.RELEASE.jar!/:0.9.6.RELEASE] at
reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onSubscribe(PooledConnectionProvider.java:498) ~[reactor-netty-0.9.6.RELEASE.jar!/:0.9.6.RELEASE] at
reactor.netty.internal.shaded.reactor.pool.SimplePool$QueueBorrowerMono.subscribe(SimplePool.java:323) ~[reactor-netty-0.9.6.RELEASE.jar!/:0.9.6.RELEASE] at
reactor.netty.resources.PooledConnectionProvider.disposableAcquire(PooledConnectionProvider.java:199) ~[reactor-netty-0.9.6.RELEASE.jar!/:0.9.6.RELEASE] at
reactor.netty.resources.PooledConnectionProvider.lambda$acquire$3(PooledConnectionProvider.java:160) ~[reactor-netty-0.9.6.RELEASE.jar!/:0.9.6.RELEASE] at
reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:320) ~[reactor-netty-0.9.6.RELEASE.jar!/:0.9.6.RELEASE] at
reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.resubscribe(FluxRetryPredicate.java:124) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.core.publisher.MonoRetryPredicate.subscribeOrReturn(MonoRetryPredicate.java:51) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:48) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:323) ~[reactor-netty-0.9.6.RELEASE.jar!/:0.9.6.RELEASE] at
reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.core.publisher.Mono.subscribe(Mono.java:4210) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
reactor.core.publisher.Mono.block(Mono.java:1665) ~[reactor-core-3.3.4.RELEASE.jar!/:3.3.4.RELEASE] at
... calling code ...
org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.2.5.RELEASE.jar!/:5.2.5.RELEASE] at
org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) ~[spring-aop-5.2.5.RELEASE.jar!/:5.2.5.RELEASE] at
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.5.RELEASE.jar!/:5.2.5.RELEASE] at
org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) ~[spring-aop-5.2.5.RELEASE.jar!/:5.2.5.RELEASE] at
org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:91) ~[spring-retry-1.2.5.RELEASE.jar!/:na] at
org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.5.RELEASE.jar!/:na] at
org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.5.RELEASE.jar!/:na] at
org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:118) ~[spring-retry-1.2.5.RELEASE.jar!/:na] at
org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:153) ~[spring-retry-1.2.5.RELEASE.jar!/:na] at
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.5.RELEASE.jar!/:5.2.5.RELEASE] at
org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) ~[spring-aop-5.2.5.RELEASE.jar!/:5.2.5.RELEASE] at
org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691) ~[spring-aop-5.2.5.RELEASE.jar!/:5.2.5.RELEASE] at
...calling code...
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na] at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na] at
java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

这似乎是 WebClient 的一个非常常见的用法,有没有其他人遇到过类似的问题?如果是这样,它是如何解决的?

标签: springnettyspring-webclientreactor-netty

解决方案


I think that you meant to set read/write timeouts at 300 seconds with the getWebClientFactory method. But note that the getWebClientFactory method is neither called anywhere in your provided code, nor exposed as a Bean. However, even if you were to expose that method, you're likely to run into http/tcp connection pool problems as described by this user in this comment.

Try the following configuration instead:

@Bean
public WebClient.Builder webClientBuilder() {
    String connectionProviderName = "customConnectionProvider";
    int maxConnections = 1000;
    int acquireTimeout = 45;
    ConnectionProvider connectionProvider = ConnectionProvider.builder(connectionProviderName)
            .maxConnections(maxConnections)
            .pendingAcquireTimeout(Duration.ofSeconds(acquireTimeout))
            .build();

    HttpClient httpClient = HttpClient.create(connectionProvider)
            .tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
            .doOnConnected(connection -> {
                connection.addHandlerLast(new ReadTimeoutHandler(300));
                connection.addHandlerLast(new WriteTimeoutHandler(300));
            }));
    return WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient));
}

推荐阅读