java - apache HttpClients 的奇怪瓶颈(同步和异步)
问题描述
我正在尝试测试 apache http 客户端库的限制,但遇到了一个奇怪的瓶颈。我的测试包括以下内容:
模拟固定 80ms 延迟的wiremock 服务器
一个单元测试,它使用一个
org.apache.http.impl.nio.client.CloseableHttpAsyncClient
尽可能快地向wiremock服务器发出可配置数量的请求,同时收集统计信息。一种单元测试,它使用一个
org.apache.http.impl.client.CloseableHttpClient
可配置数量的线程尽可能快地从可配置数量的线程向wiremock服务器发出可配置数量的请求,同时收集统计信息。一个单元测试,它使用一个
org.springframework.web.reactive.function.client.WebClient
尽可能快地向wiremock服务器发出可配置数量的请求,同时收集统计信息。
所有测试在我的本地机器上都显示了相同的 570 个请求/秒的性能数字。运行这些测试时 CPU 非常低,利用率约为 5%。所以我可以假设瓶颈不在CPU,而是在其他地方......
我的问题是这个瓶颈在哪里,我们如何扩大它?
我的系统配置:
- 处理器:3.1 GHz 四核 Intel Core i7
- 内存:16 GB 2133 MHz LPDDR3
- 操作系统:OSX 10.15.5
- Java版本:11.0.4
我的单元测试:
package com.blakeparmeter.bottleneck_mystery;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
/**
* Used to illustrate a performance bottleneck with the apache HttpClients
*/
public class BottleneckTest {
// Test variables
final int totalTests = 5_000;
final long messageInterval = 1000; //ms
final URI testUri = URI.create("http://localhost:5000/wait/fixed/empty");
@Test
public void testSync() throws InterruptedException, ExecutionException {
final int numThreads = 100;
// Creates the sync client (unit under test)
final CloseableHttpClient unitUnderTest = HttpClientBuilder
.create()
.setMaxConnTotal(5000)
.setMaxConnPerRoute(5000)
.build();
// Run the test on an executor, send results to a stats aggregator
final ForkJoinPool executor = new ForkJoinPool(numThreads);
final StatsAggregator statsAggregator = new StatsAggregator(totalTests, messageInterval);
executor.submit(() -> IntStream.range(0, numThreads)
.parallel()
.forEach(threadNum -> IntStream.range(0, totalTests / numThreads).forEach(testNum -> {
final long runStart = System.currentTimeMillis();
try (final CloseableHttpResponse response = unitUnderTest.execute(new HttpGet(testUri))) {
// we don't need to do anything with the response, just make sure it's sent.
} catch (final IOException e) {
Assertions.fail(e);
}
statsAggregator.addTestDuration(System.currentTimeMillis() - runStart);
})))
.get();
// print the stats one last time (await is not needed since we wait on the executor)
statsAggregator.printStats();
}
@Test
public void testAsync() throws InterruptedException {
// Creates the async client (unit under test)
final CloseableHttpAsyncClient unitUnderTest = HttpAsyncClients.custom()
.setMaxConnTotal(5000)
.setMaxConnPerRoute(5000)
.build();
unitUnderTest.start();
// Runs all of the tests, sends results to a stats aggregator
final CountDownLatch testCountdown = new CountDownLatch(totalTests);
final StatsAggregator statsAggregator = new StatsAggregator(totalTests, messageInterval);
IntStream.range(0, totalTests).forEach(testNum -> {
final long runStart = System.currentTimeMillis();
unitUnderTest.execute(new HttpGet(testUri), new FutureCallback<>() {
@Override
public void completed(final HttpResponse response) {
statsAggregator.addTestDuration(System.currentTimeMillis() - runStart);
testCountdown.countDown();
}
@Override
public void failed(final Exception ex) {
Assertions.fail(ex.getMessage());
}
@Override
public void cancelled() {
Assertions.fail("Http Request Cancelled");
}
});
});
// await execution then print the stats one last time
testCountdown.await();
statsAggregator.printStats();
}
@Test
public void testReactive() {
final WebClient unitUnderTest = WebClient.builder().build();
// Runs all of the tests, sends results to a stats aggregator
final StatsAggregator statsAggregator = new StatsAggregator(totalTests, messageInterval);
Flux.range(0, totalTests)
.flatMap(testNum -> {
final long runStart = System.currentTimeMillis();
return unitUnderTest.get()
.uri(testUri)
.retrieve()
.bodyToMono(Object.class)
.doOnSuccess(obj -> statsAggregator.addTestDuration(System.currentTimeMillis() - runStart));
})
.then()
.block();
// print the stats one last time
statsAggregator.printStats();
}
}
StatsAggregator.java:
package com.blakeparmeter.bottleneck_mystery;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
/**
* @author Blake L. Parmeter
*/
public class StatsAggregator {
private static final DecimalFormat testCompleteFormat = new DecimalFormat("###,###,###");
private static final DecimalFormat avgRequestFormat = new DecimalFormat("###,###.##");
private final long start = System.currentTimeMillis();
private final List<Long> times;
private final TimerTask renderStatisticsTask;
// Creates a timer task to calculate and render runtime stats in realtime.
public StatsAggregator(final int totalTests, final long messageIntervalMillis) {
this.times = new ArrayList<>(totalTests);
renderStatisticsTask = new TimerTask() {
private Long lastLogTime = null;
private Integer lastLogSize = null;
@Override
public void run() {
// Init variables needed for calculations
final long logTime = System.currentTimeMillis();
final List<Long> statsCopy;
synchronized (times) {
if (!times.isEmpty()) {
statsCopy = new ArrayList<>(times);
} else {
System.out.println("No statistics have been loaded. Statistics will not be calculated.");
return;
}
}
Collections.sort(statsCopy);
// print execution completion status
System.out.println();
final double percentComplete = ((double) statsCopy.size() / (double) totalTests);
final long runtime = logTime - start; //ms
final double estimatedTimeRemaining = ((double) runtime / percentComplete) - (double) runtime; //ms
System.out.println(testCompleteFormat.format(statsCopy.size())
+ "\tTests completed of:"
+ testCompleteFormat.format(totalTests)
+ "\t"
+ avgRequestFormat.format(percentComplete * 100)
+ "% complete. "
+ "Running for: "
+ runtime / 1000d
+ " seconds. "
+ "Estimated Time remaining: "
+ testCompleteFormat.format(estimatedTimeRemaining / 1000d)
+ " seconds.");
// print running average requests / second
String sinceLastLogStats = "";
if (lastLogSize != null && lastLogTime != null) {
double numSinceLastLog = (double) statsCopy.size() - lastLogSize;
double timeSinceLastLog = (double) logTime - lastLogTime;
double avgReqPerSecSinceLastLogSec = 1000 * (numSinceLastLog / timeSinceLastLog);
sinceLastLogStats = "\tavg req/sec:"
+ avgRequestFormat.format(avgReqPerSecSinceLastLogSec)
+ "(since last run)";
}
lastLogSize = statsCopy.size();
lastLogTime = logTime;
double avgReqPerSec = 1000 * ((double) statsCopy.size() / (double) (logTime - start));
System.out.println("\tavg req/sec:"
+ avgRequestFormat.format(avgReqPerSec)
+ "(total)"
+ sinceLastLogStats);
// print average min and max
double avg = (double) statsCopy.stream().reduce(Long::sum).orElseThrow() / (double) statsCopy.size();
System.out.println("\tavg:" + avgRequestFormat.format(avg) +
"\tmin:" + statsCopy.get(0) +
"\tmax:" + statsCopy.get(statsCopy.size() - 1));
// print percentiles
System.out.println("\tRequest duration percentiles:\n" +
"\t\t1%:" + percentile(statsCopy, 1) +
"\t5%:" + percentile(statsCopy, 5) +
"\t10%:" + percentile(statsCopy, 10) +
"\t50%:" + percentile(statsCopy, 50) +
"\t90%:" + percentile(statsCopy, 90) +
"\t95%:" + percentile(statsCopy, 95) +
"\t99%:" + percentile(statsCopy, 99) +
"\t99.9%:" + percentile(statsCopy, 99.9) +
"\t99.99%:" + percentile(statsCopy, 99.99));
System.out.println("\tCalculations took:" + (System.currentTimeMillis() - logTime) + "ms.");
}
};
// Schedule printing of statistics on a timer
final Timer timer = new Timer("test-output-timer", true);
timer.schedule(renderStatisticsTask,messageIntervalMillis,messageIntervalMillis);
}
public void printStats() {
renderStatisticsTask.run();
}
public void addTestDuration(final long time) {
times.add(time);
}
private static long percentile(List<Long> times, double percentile) {
int index = (int) Math.ceil(percentile / 100.0 * times.size());
return times.get(index - 1);
}
}
为此测试运行wiremock:
- 从此处下载独立 jar:http ://wiremock.org/docs/running-standalone/到将
<wiremock_directory>
在这些说明中引用的目录中。下载的文件将被称为<wiremock.jar>
- 创建目录
<wiremock_directory>/mappings
- 创建一个名为endpoint.json的文件并放入
<wiremock_directory>/mappings
{
"request": {
"method": "GET",
"urlPathPattern": "/wait/fixed/empty"
},
"response": {
"headers": {
"Content-Type": "application/json;charset=UTF-8"
},
"status": 200,
"fixedDelayMilliseconds": 80
}
}
- 使用命令在可以处理高并发的模式下运行wiremock:
java -jar <wiremock.jar> --port 5000 --container-threads 250 --jetty-acceptor-threads 200 --no-request-journal
注意:将命令中的文件名替换为下载的文件。
解决方案
如果您无法最大化 CPU 的内核,则意味着您的线程正在等待。最有可能用于 IO。当您拨打网络电话时。通常,一旦 IO 开始发挥作用,您就不太可能创建 CPU 绑定负载。要了解您的线程在做什么,请在测试运行时创建线程转储并查看它们的堆栈跟踪。
还有几件事:
ForkJoinPool
如果您不创建旨在窃取工作的任务,最好不要使用。即ForkJoinTask
。在您的情况下,简单ThreadPoolExecutor
的可能会表现得更好。- 您可能会用并行流向自己的膝盖射击。您创建一个执行器,其中包含许多线程,每个线程都接收一个
Runnable
要执行的线程,该线程使用并行引擎,而并行引擎又使用单个共享的 ForkJoinPool 线程,线程数与逻辑 CPU 内核数一样多。如果您有 8 个逻辑核心,则意味着您有 100 个线程驱动任务,这些任务都共享一个 8 线程池来完成它们的工作。(虽然如果是这样的话,如果我的数学正确的话,你应该每秒只能收到大约 100 个请求。) - 收集每个请求的时间也可能很昂贵。我宁愿不要
System.currentTimeMillis()
对每个请求都打两次电话。(不过,与 http 客户端所做的联网请求的成本相比,成本很可能可以忽略不计。)
推荐阅读
- android - 错误是“在创建 RealmConfiguration 之前调用 `Realm.init(Context)`”
- hdfs - flink 中的检查点
- asp.net-web-api - 附加 zip 文件在 WEB API 中不起作用,但通过 POSTMAN 起作用
- phpstorm - 有没有办法在 PhpStorm 中创建对 AWS 代码提交的拉取请求?
- html - 如何更改 Font Awesome 5 图标颜色?
- elasticsearch - Elasticsearch 不能识别词组?
- sql-server - 从选择顶部获取数据
- python - 理解 heapq 排序算法
- java - 使用 HashMap 动态改变 jTable
- python-3.x - 使用python从txt文件中提取特定单词