首页 > 解决方案 > 为什么阻塞 thenApplyAsync 有效,但不适用于 thenApply

问题描述

我们在应用程序中看到了一些有趣的行为。以下 Spock 规范捕获了该行为。我试图理解为什么第二个测试通过但第一个会抛出 TimeoutException。

摘要:有一个带有模拟端点的模拟服务器,在 10 毫秒延迟后响应成功。我们使用 AsyncHttpClient 对这个模拟端点进行非阻塞调用。第一个调用与对同一端点的第二个阻塞调用链接在一起。第一次调用成功,但第二次调用失败,如果使用超时,thenApply但如果使用成功thenApplyAsync。在这两种情况下,模拟服务器似乎都在 10 毫秒内响应。

依赖项:


    implementation 'com.google.guava:guava:29.0-jre'
    implementation 'org.asynchttpclient:async-http-client:2.12.1'

    // Use the latest Groovy version for Spock testing
    testImplementation 'org.codehaus.groovy:groovy-all:2.5.11'

    // Use the awesome Spock testing and specification framework even with Java
    testImplementation 'org.spockframework:spock-core:1.3-groovy-2.5'
    testImplementation 'org.objenesis:objenesis:1.4'
    testImplementation "cglib:cglib:2.2"
    testImplementation 'junit:junit:4.13'
    testImplementation 'org.mock-server:mockserver-netty:5.11.1'

斯波克规格:


package com.switchcase.asyncthroughput

import com.google.common.base.Charsets
import org.asynchttpclient.DefaultAsyncHttpClient
import org.asynchttpclient.RequestBuilder
import org.mockserver.integration.ClientAndServer
import org.mockserver.model.HttpResponse
import spock.lang.Shared
import spock.lang.Specification

import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

import static org.mockserver.integration.ClientAndServer.startClientAndServer
import static org.mockserver.model.HttpRequest.request

class CompletableFutureThreadsTest extends Specification {

    @Shared
    ClientAndServer mockServer

    def asyncHttpClient = new DefaultAsyncHttpClient();

    def setupSpec() {
        mockServer = startClientAndServer(9192);
        //create a mock server which response with "done" after 100ms.
        mockServer.when(request()
                .withMethod("POST")
                .withPath("/validate"))
                .respond(HttpResponse.response().withBody("done")
                        .withStatusCode(200)
                        .withDelay(TimeUnit.MILLISECONDS, 10));
    }

    def "Calls external using AHC with a blocking call with 1sec timeout results in TimeoutException."() {
        when:
        callExternal().thenApply({ resp -> callExternalBlocking() }).join()

        then:
        def exception = thrown(CompletionException)
        exception instanceof CompletionException
        exception.getCause() instanceof TimeoutException
        exception.printStackTrace()
    }

    def "Calls external using AHC with a blocking call on ForkJoinPool with 1sec timeout results in success."() {
        when:
        def value = callExternal().thenApplyAsync({ resp -> callExternalBlocking() }).join()

        then:
        value == "done"
    }

    def cleanupSpec() {
        mockServer.stop(true)
    }

    private CompletableFuture<String> callExternal(def timeout = 1000) {
        RequestBuilder requestBuilder = RequestBuilder.newInstance();
        requestBuilder.setMethod("POST").setUrl("http://localhost:9192/validate").setRequestTimeout(timeout)
        def cf = asyncHttpClient.executeRequest(requestBuilder).toCompletableFuture()
        return cf.thenApply({ response ->
            println("CallExternal Succeeded.")
            return response.getResponseBody(Charsets.UTF_8)
        })
    }

    private String callExternalBlocking(def timeout = 1000) {
        RequestBuilder requestBuilder = RequestBuilder.newInstance();
        requestBuilder.setMethod("POST").setUrl("http://localhost:9192/validate").setRequestTimeout(timeout)
        def cf = asyncHttpClient.executeRequest(requestBuilder).toCompletableFuture()
        return cf.thenApply({ response ->
            println("CallExternalBlocking Succeeded.")
            return response.getResponseBody(Charsets.UTF_8)
        }).join()
    }
}

编辑:

超时的调试日志和堆栈跟踪:(超时发生在远程调用中callExternalBlocking

17:37:38.885 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.timeout.TimeoutTimerTask - Request timeout to localhost/127.0.0.1:9192 after 1000 ms for NettyResponseFuture{currentRetry=0,
    isDone=0,
    isCancelled=0,
    asyncHandler=org.asynchttpclient.AsyncCompletionHandlerBase@478251c9,
    nettyRequest=org.asynchttpclient.netty.request.NettyRequest@4945b749,
    future=java.util.concurrent.CompletableFuture@4d7a3ab9[Not completed, 1 dependents],
    uri=http://localhost:9192/validate,
    keepAlive=true,
    redirectCount=0,
    timeoutsHolder=org.asynchttpclient.netty.timeout.TimeoutsHolder@878bd72,
    inAuth=0,
    touch=1622248657866} after 1019 ms
17:37:38.886 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.channel.ChannelManager - Closing Channel [id: 0x5485056c, L:/127.0.0.1:58076 - R:localhost/127.0.0.1:9192] 
17:37:38.886 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.request.NettyRequestSender - Aborting Future NettyResponseFuture{currentRetry=0,
    isDone=0,
    isCancelled=0,
    asyncHandler=org.asynchttpclient.AsyncCompletionHandlerBase@478251c9,
    nettyRequest=org.asynchttpclient.netty.request.NettyRequest@4945b749,
    future=java.util.concurrent.CompletableFuture@4d7a3ab9[Not completed, 1 dependents],
    uri=http://localhost:9192/validate,
    keepAlive=true,
    redirectCount=0,
    timeoutsHolder=org.asynchttpclient.netty.timeout.TimeoutsHolder@878bd72,
    inAuth=0,
    touch=1622248657866}

java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Request timeout to localhost/127.0.0.1:9192 after 1000 ms
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at org.asynchttpclient.netty.NettyResponseFuture.abort(NettyResponseFuture.java:273)
    at org.asynchttpclient.netty.request.NettyRequestSender.abort(NettyRequestSender.java:473)
    at org.asynchttpclient.netty.timeout.TimeoutTimerTask.expire(TimeoutTimerTask.java:43)
    at org.asynchttpclient.netty.timeout.RequestTimeoutTimerTask.run(RequestTimeoutTimerTask.java:50)
    at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
    at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
    at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Request timeout to localhost/127.0.0.1:9192 after 1000 ms
    ... 7 more

标签: javacompletable-futureasynchttpclient

解决方案


推荐阅读