java - 异步 Http 客户端 + Netty
问题描述
我在对异步 http 客户端(版本 - 2.4.3)进行基准测试时遇到问题,并在运行下面粘贴的代码时遇到以下异常。仅供参考,我是 Netty 和 async-http-client 的新手,所以请原谅我,以防我的理解有差距。
exception - io.netty.channel.ConnectTimeoutException: connection timed out:
以下是我的假设 -
- 下面的代码将有一个线程(单个事件循环)。
- keepalive 将确保重用 tcp 连接
- tcp 连接的大小由 setMaxConnections & setMaxConnectionsPerHost 决定
- ThrottleRequestFilter 将确保在处理处理程序代码的线程主动使用所有连接时不会发出请求。
以下是我的问题-
1] 虽然我使用 ThrottleRequestFilter 为什么我应该看到连接超时。我的假设是,一旦处理程序处理了响应,则连接应该可以重用,因为我设置了 keepalive,除非我缺少某些东西。
2] 执行 AsyncHandler 代码的默认线程池的大小是多少。
以下是我的测试运行
Total Requests - N = 1000
Max Con = 1000
Max Con/Host = 1000
pcit = 60000
ka = true
以下是指标 -
======================================
Total num of Reqs: 1000
Concurrency : 1000
Total Time in secs: 1.0
HTTP 200 OK: 789
HTTP 200 NOT OK: 211
Total Completed : 1000
rps : 789.0
======================================
public class HttpBm {
final Metrics metrics;
long startTime = System.nanoTime();
public HttpBm(Metrics metrics) {
this.metrics = metrics;
}
private void run(int n, int mc, int mcph, int pcit, boolean ka, String url) {
AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config()
.addRequestFilter(new ThrottleRequestFilter(mcph))
.setMaxConnections(mc)
.setMaxConnectionsPerHost(mcph)
.setKeepAlive(ka)
.setConnectTimeout(1000)
.setConnectionTtl(500));
for (int r=0;r<n;r++) {
final ListenableFuture<Response> whenResponse = asyncHttpClient.prepareGet(url).execute(new AsyncHandler<Response>() {
private Integer status;
public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
if (200 == responseStatus.getStatusCode())
metrics.incrHttp200OK();
else
metrics.incrHttpNon200OK();
return State.ABORT;
}
public State onHeadersReceived(HttpHeaders headers) throws Exception {
return State.ABORT;
}
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
return State.ABORT;
}
public void onThrowable(Throwable t) {
metrics.incrHttpNon200OK();
t.printStackTrace();
}
public Response onCompleted() throws Exception {
return null;
}
});
}
long endTime = System.nanoTime();
boolean done = true;
while (done) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
done = metrics.print(startTime, endTime);
}
}
public static void main(String[] args) {
int n = 1000;
int mc = 1000;
int mcph = 1000;
int pcit = 60000;
String url = "http://somehost:8080/index.html";
if (args != null) {
n = Integer.parseInt(args[0]);
mc = Integer.parseInt(args[1]);
mcph = Integer.parseInt(args[2]);
pcit = Integer.parseInt(args[3]);
url = args[4];
}
System.out.println();
System.out.println("==================================");
System.out.println("Url = " + url);
System.out.println("Total Requests - N = " + n);
System.out.println("Max Con = " + mc);
System.out.println("Max Con/Host = " + mcph);
System.out.println("pcit = " + pcit);
System.out.println("ka = true");
System.out.println("==================================");
HttpBm httpBm = new HttpBm(new Metrics(n, mc));
httpBm.run(n , mc, mcph, pcit, true, url);
}
}
import java.util.concurrent.atomic.AtomicInteger;
public class Metrics {
private int n;
private int c;
private AtomicInteger Http200OK = new AtomicInteger();
private AtomicInteger HttpNon200OK = new AtomicInteger();
public Metrics(int n, int c) {
this.n = n;
this.c = c;
}
public void incrHttp200OK() {
Http200OK.incrementAndGet();
}
public void incrHttpNon200OK() {
HttpNon200OK.incrementAndGet();
}
public boolean isComplete() {
return Http200OK.get() + HttpNon200OK.get() >= n;
}
public boolean print(long startTime, long finishTime) {
final float totalTimeSec = (finishTime - startTime) / 1000000000;
final float rps = this.Http200OK.get() / totalTimeSec;
System.out.println("======================================");
System.out.println("Total num of Reqs:\t" + n);
System.out.println("Concurrency :\t" + c);
System.out.println("Total Time in secs:\t" + totalTimeSec);
System.out.println("HTTP 200 OK:\t" + Http200OK.get());
System.out.println("HTTP 200 NOT OK:\t" + HttpNon200OK.get());
System.out.println("Total Completed :\t" + (Http200OK.get() + HttpNon200OK.get()));
System.out.println("rps : " + rps);
System.out.println("======================================");
return isComplete();
}
}
解决方案
推荐阅读
- google-apps-script - 谷歌脚本和 ESP32
- javascript - 使用 Vega 创建可编辑树图的最佳方法?
- kibana - 计入 savedSearch kibana
- c# - 为什么我在尝试生成重置密码时总是收到无效的 URL?
- octave - 使用 Octave GUI 命令窗口执行 .m 文件会导致错误:没有这样的文件
- angular - 如何在d3js(Angular)中将弦画到一个圆上
- c - 客户端 - 使用信号 SIGUSR1、SIGUSR2 传输文本的服务器程序
- homebrew - 执行“brew tap exolnet/homebrew-deprecated”时出错
- c++ - CMake 在 CMakeLists.txt(托管 C++ 项目)中指定本机构建系统选项
- node.js - 如何使用 fetch api 在 MERN 堆栈中将包含一些文本字段和 csv 文件的表单数据从前端发送到后端?