java - 为什么阻塞 thenApplyAsync 有效,但不适用于 thenApply
问题描述
我们在应用程序中看到了一些有趣的行为。以下 Spock 规范捕获了该行为。我试图理解为什么第二个测试通过但第一个会抛出 TimeoutException。
摘要:有一个带有模拟端点的模拟服务器,在 10 毫秒延迟后响应成功。我们使用 AsyncHttpClient 对这个模拟端点进行非阻塞调用。第一个调用与对同一端点的第二个阻塞调用链接在一起。第一次调用成功,但第二次调用失败,如果使用超时,thenApply
但如果使用成功thenApplyAsync
。在这两种情况下,模拟服务器似乎都在 10 毫秒内响应。
依赖项:
implementation 'com.google.guava:guava:29.0-jre'
implementation 'org.asynchttpclient:async-http-client:2.12.1'
// Use the latest Groovy version for Spock testing
testImplementation 'org.codehaus.groovy:groovy-all:2.5.11'
// Use the awesome Spock testing and specification framework even with Java
testImplementation 'org.spockframework:spock-core:1.3-groovy-2.5'
testImplementation 'org.objenesis:objenesis:1.4'
testImplementation "cglib:cglib:2.2"
testImplementation 'junit:junit:4.13'
testImplementation 'org.mock-server:mockserver-netty:5.11.1'
斯波克规格:
package com.switchcase.asyncthroughput
import com.google.common.base.Charsets
import org.asynchttpclient.DefaultAsyncHttpClient
import org.asynchttpclient.RequestBuilder
import org.mockserver.integration.ClientAndServer
import org.mockserver.model.HttpResponse
import spock.lang.Shared
import spock.lang.Specification
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import static org.mockserver.integration.ClientAndServer.startClientAndServer
import static org.mockserver.model.HttpRequest.request
class CompletableFutureThreadsTest extends Specification {
@Shared
ClientAndServer mockServer
def asyncHttpClient = new DefaultAsyncHttpClient();
def setupSpec() {
mockServer = startClientAndServer(9192);
//create a mock server which response with "done" after 100ms.
mockServer.when(request()
.withMethod("POST")
.withPath("/validate"))
.respond(HttpResponse.response().withBody("done")
.withStatusCode(200)
.withDelay(TimeUnit.MILLISECONDS, 10));
}
def "Calls external using AHC with a blocking call with 1sec timeout results in TimeoutException."() {
when:
callExternal().thenApply({ resp -> callExternalBlocking() }).join()
then:
def exception = thrown(CompletionException)
exception instanceof CompletionException
exception.getCause() instanceof TimeoutException
exception.printStackTrace()
}
def "Calls external using AHC with a blocking call on ForkJoinPool with 1sec timeout results in success."() {
when:
def value = callExternal().thenApplyAsync({ resp -> callExternalBlocking() }).join()
then:
value == "done"
}
def cleanupSpec() {
mockServer.stop(true)
}
private CompletableFuture<String> callExternal(def timeout = 1000) {
RequestBuilder requestBuilder = RequestBuilder.newInstance();
requestBuilder.setMethod("POST").setUrl("http://localhost:9192/validate").setRequestTimeout(timeout)
def cf = asyncHttpClient.executeRequest(requestBuilder).toCompletableFuture()
return cf.thenApply({ response ->
println("CallExternal Succeeded.")
return response.getResponseBody(Charsets.UTF_8)
})
}
private String callExternalBlocking(def timeout = 1000) {
RequestBuilder requestBuilder = RequestBuilder.newInstance();
requestBuilder.setMethod("POST").setUrl("http://localhost:9192/validate").setRequestTimeout(timeout)
def cf = asyncHttpClient.executeRequest(requestBuilder).toCompletableFuture()
return cf.thenApply({ response ->
println("CallExternalBlocking Succeeded.")
return response.getResponseBody(Charsets.UTF_8)
}).join()
}
}
编辑:
超时的调试日志和堆栈跟踪:(超时发生在远程调用中callExternalBlocking
)
17:37:38.885 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.timeout.TimeoutTimerTask - Request timeout to localhost/127.0.0.1:9192 after 1000 ms for NettyResponseFuture{currentRetry=0,
isDone=0,
isCancelled=0,
asyncHandler=org.asynchttpclient.AsyncCompletionHandlerBase@478251c9,
nettyRequest=org.asynchttpclient.netty.request.NettyRequest@4945b749,
future=java.util.concurrent.CompletableFuture@4d7a3ab9[Not completed, 1 dependents],
uri=http://localhost:9192/validate,
keepAlive=true,
redirectCount=0,
timeoutsHolder=org.asynchttpclient.netty.timeout.TimeoutsHolder@878bd72,
inAuth=0,
touch=1622248657866} after 1019 ms
17:37:38.886 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.channel.ChannelManager - Closing Channel [id: 0x5485056c, L:/127.0.0.1:58076 - R:localhost/127.0.0.1:9192]
17:37:38.886 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.request.NettyRequestSender - Aborting Future NettyResponseFuture{currentRetry=0,
isDone=0,
isCancelled=0,
asyncHandler=org.asynchttpclient.AsyncCompletionHandlerBase@478251c9,
nettyRequest=org.asynchttpclient.netty.request.NettyRequest@4945b749,
future=java.util.concurrent.CompletableFuture@4d7a3ab9[Not completed, 1 dependents],
uri=http://localhost:9192/validate,
keepAlive=true,
redirectCount=0,
timeoutsHolder=org.asynchttpclient.netty.timeout.TimeoutsHolder@878bd72,
inAuth=0,
touch=1622248657866}
java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Request timeout to localhost/127.0.0.1:9192 after 1000 ms
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.asynchttpclient.netty.NettyResponseFuture.abort(NettyResponseFuture.java:273)
at org.asynchttpclient.netty.request.NettyRequestSender.abort(NettyRequestSender.java:473)
at org.asynchttpclient.netty.timeout.TimeoutTimerTask.expire(TimeoutTimerTask.java:43)
at org.asynchttpclient.netty.timeout.RequestTimeoutTimerTask.run(RequestTimeoutTimerTask.java:50)
at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Request timeout to localhost/127.0.0.1:9192 after 1000 ms
... 7 more
解决方案
推荐阅读
- android - 在 Samsung Galaxt S9 (Oreo) 上显示安全异常
- xml - 在命名空间定义中使用变量
- botframework - Botframework WebChat:无法在其他聊天/浏览器上开始新的和干净的对话
- php - Ajax 获取表单 PHP MySQL 的提交按钮的动态 id
- nfc - 如何开始与 ACR122U-A9 NFC 读写器交互?
- java - java将结果集转换为列表
- cordova - 基于 Cordova 的应用程序启动画面延迟不适用于 Windows 10 版本
- java - 基于 java 的项目环境设置的 sonar.java.binaries 属性中的错误
- java - java Oracle db eclipse TopLink RelationalDescriptor没有主键:有可能吗?
- node.js - Node.js 和 Express 表单提交返回 404