首页 > 解决方案 > apache HttpClients 的奇怪瓶颈(同步和异步)

问题描述

我正在尝试测试 apache http 客户端库的限制,但遇到了一个奇怪的瓶颈。我的测试包括以下内容:

  1. 模拟固定 80ms 延迟的wiremock 服务器

  2. 一个单元测试,它使用一个org.apache.http.impl.nio.client.CloseableHttpAsyncClient尽可能快地向wiremock服务器发出可配置数量的请求,同时收集统计信息。

  3. 一种单元测试,它使用一个org.apache.http.impl.client.CloseableHttpClient可配置数量的线程尽可能快地从可配置数量的线程向wiremock服务器发出可配置数量的请求,同时收集统计信息。

  4. 一个单元测试,它使用一个org.springframework.web.reactive.function.client.WebClient尽可能快地向wiremock服务器发出可配置数量的请求,同时收集统计信息。

所有测试在我的本地机器上都显示了相同的 570 个请求/秒的性能数字。运行这些测试时 CPU 非常低,利用率约为 5%。所以我可以假设瓶颈不在CPU,而是在其他地方......

我的问题是这个瓶颈在哪里,我们如何扩大它?

我的系统配置:

我的单元测试:

package com.blakeparmeter.bottleneck_mystery;

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

/**
 * Used to illustrate a performance bottleneck with the apache HttpClients
 */
public class BottleneckTest {

    // Test variables
    final int totalTests = 5_000;
    final long messageInterval = 1000; //ms
    final URI testUri =  URI.create("http://localhost:5000/wait/fixed/empty");

    @Test
    public void testSync() throws InterruptedException, ExecutionException {

        final int numThreads = 100;

        // Creates the sync client (unit under test)
        final CloseableHttpClient unitUnderTest = HttpClientBuilder
                .create()
                .setMaxConnTotal(5000)
                .setMaxConnPerRoute(5000)
                .build();

        // Run the test on an executor, send results to a stats aggregator
        final ForkJoinPool executor = new ForkJoinPool(numThreads);
        final StatsAggregator statsAggregator = new StatsAggregator(totalTests, messageInterval);
        executor.submit(() -> IntStream.range(0, numThreads)
                .parallel()
                .forEach(threadNum -> IntStream.range(0, totalTests / numThreads).forEach(testNum -> {
                    final long runStart = System.currentTimeMillis();
                    try (final CloseableHttpResponse response = unitUnderTest.execute(new HttpGet(testUri))) {
                        // we don't need to do anything with the response, just make sure it's sent.
                    } catch (final IOException e) {
                        Assertions.fail(e);
                    }
                    statsAggregator.addTestDuration(System.currentTimeMillis() - runStart);
                })))
                .get();

        // print the stats one last time (await is not needed since we wait on the executor)
        statsAggregator.printStats();
    }

    @Test
    public void testAsync() throws InterruptedException {

        // Creates the async client (unit under test)
        final CloseableHttpAsyncClient unitUnderTest = HttpAsyncClients.custom()
                .setMaxConnTotal(5000)
                .setMaxConnPerRoute(5000)
                .build();
        unitUnderTest.start();

        // Runs all of the tests, sends results to a stats aggregator
        final CountDownLatch testCountdown = new CountDownLatch(totalTests);
        final StatsAggregator statsAggregator = new StatsAggregator(totalTests, messageInterval);
        IntStream.range(0, totalTests).forEach(testNum -> {

            final long runStart = System.currentTimeMillis();
            unitUnderTest.execute(new HttpGet(testUri), new FutureCallback<>() {

                @Override
                public void completed(final HttpResponse response) {
                    statsAggregator.addTestDuration(System.currentTimeMillis() - runStart);
                    testCountdown.countDown();
                }

                @Override
                public void failed(final Exception ex) {
                    Assertions.fail(ex.getMessage());
                }

                @Override
                public void cancelled() {
                    Assertions.fail("Http Request Cancelled");
                }
            });
        });

        // await execution then print the stats one last time
        testCountdown.await();
        statsAggregator.printStats();
    }

    @Test
    public void testReactive() {

        final WebClient unitUnderTest = WebClient.builder().build();

        // Runs all of the tests, sends results to a stats aggregator
        final StatsAggregator statsAggregator = new StatsAggregator(totalTests, messageInterval);
        Flux.range(0, totalTests)
                .flatMap(testNum -> {
                    final long runStart = System.currentTimeMillis();
                    return unitUnderTest.get()
                            .uri(testUri)
                            .retrieve()
                            .bodyToMono(Object.class)
                            .doOnSuccess(obj -> statsAggregator.addTestDuration(System.currentTimeMillis() - runStart));
                })
                .then()
                .block();

        // print the stats one last time
        statsAggregator.printStats();
    }
}

StatsAggregator.java:

package com.blakeparmeter.bottleneck_mystery;

import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

/**
 * @author Blake L. Parmeter
 */
public class StatsAggregator {

    private static final DecimalFormat testCompleteFormat = new DecimalFormat("###,###,###");
    private static final DecimalFormat avgRequestFormat = new DecimalFormat("###,###.##");

    private final long start = System.currentTimeMillis();
    private final List<Long> times;
    private final TimerTask renderStatisticsTask;

    // Creates a timer task to calculate and render runtime stats in realtime.
    public StatsAggregator(final int totalTests, final long messageIntervalMillis) {

        this.times = new ArrayList<>(totalTests);

        renderStatisticsTask = new TimerTask() {

            private Long lastLogTime = null;
            private Integer lastLogSize = null;

            @Override
            public void run() {

                // Init variables needed for calculations
                final long logTime = System.currentTimeMillis();
                final List<Long> statsCopy;
                synchronized (times) {
                    if (!times.isEmpty()) {
                        statsCopy = new ArrayList<>(times);
                    } else {
                        System.out.println("No statistics have been loaded. Statistics will not be calculated.");
                        return;
                    }
                }
                Collections.sort(statsCopy);

                // print execution completion status
                System.out.println();
                final double percentComplete = ((double) statsCopy.size() / (double) totalTests);
                final long runtime = logTime - start; //ms
                final double estimatedTimeRemaining = ((double) runtime / percentComplete) - (double) runtime; //ms
                System.out.println(testCompleteFormat.format(statsCopy.size())
                        + "\tTests completed of:"
                        + testCompleteFormat.format(totalTests)
                        + "\t"
                        + avgRequestFormat.format(percentComplete * 100)
                        + "% complete. "
                        + "Running for: "
                        + runtime / 1000d
                        + " seconds. "
                        + "Estimated Time remaining: "
                        + testCompleteFormat.format(estimatedTimeRemaining / 1000d)
                        + " seconds.");

                // print running average requests / second
                String sinceLastLogStats = "";
                if (lastLogSize != null && lastLogTime != null) {
                    double numSinceLastLog = (double) statsCopy.size() - lastLogSize;
                    double timeSinceLastLog = (double) logTime - lastLogTime;
                    double avgReqPerSecSinceLastLogSec = 1000 * (numSinceLastLog / timeSinceLastLog);
                    sinceLastLogStats = "\tavg req/sec:"
                            + avgRequestFormat.format(avgReqPerSecSinceLastLogSec)
                            + "(since last run)";
                }
                lastLogSize = statsCopy.size();
                lastLogTime = logTime;
                double avgReqPerSec = 1000 * ((double) statsCopy.size() / (double) (logTime - start));
                System.out.println("\tavg req/sec:"
                        + avgRequestFormat.format(avgReqPerSec)
                        + "(total)"
                        + sinceLastLogStats);

                // print average min and max
                double avg = (double) statsCopy.stream().reduce(Long::sum).orElseThrow() / (double) statsCopy.size();
                System.out.println("\tavg:" + avgRequestFormat.format(avg) +
                        "\tmin:" + statsCopy.get(0) +
                        "\tmax:" + statsCopy.get(statsCopy.size() - 1));

                // print percentiles
                System.out.println("\tRequest duration percentiles:\n" +
                        "\t\t1%:" + percentile(statsCopy, 1) +
                        "\t5%:" + percentile(statsCopy, 5) +
                        "\t10%:" + percentile(statsCopy, 10) +
                        "\t50%:" + percentile(statsCopy, 50) +
                        "\t90%:" + percentile(statsCopy, 90) +
                        "\t95%:" + percentile(statsCopy, 95) +
                        "\t99%:" + percentile(statsCopy, 99) +
                        "\t99.9%:" + percentile(statsCopy, 99.9) +
                        "\t99.99%:" + percentile(statsCopy, 99.99));

                System.out.println("\tCalculations took:" + (System.currentTimeMillis() - logTime) + "ms.");
            }
        };

        // Schedule printing of statistics on a timer
        final Timer timer = new Timer("test-output-timer", true);
            timer.schedule(renderStatisticsTask,messageIntervalMillis,messageIntervalMillis);
    }

    public void printStats() {
        renderStatisticsTask.run();
    }

    public void addTestDuration(final long time) {
        times.add(time);
    }

    private static long percentile(List<Long> times, double percentile) {
        int index = (int) Math.ceil(percentile / 100.0 * times.size());
        return times.get(index - 1);
    }
}

为此测试运行wiremock:

  1. 从此处下载独立 jar:http ://wiremock.org/docs/running-standalone/到将<wiremock_directory>在这些说明中引用的目录中。下载的文件将被称为<wiremock.jar>
  2. 创建目录<wiremock_directory>/mappings
  3. 创建一个名为endpoint.json的文件并放入<wiremock_directory>/mappings
{
    "request": {
        "method": "GET",
        "urlPathPattern": "/wait/fixed/empty"
    },
    "response": {
        "headers": {
            "Content-Type": "application/json;charset=UTF-8"
        },
        "status": 200,
        "fixedDelayMilliseconds": 80
    }
}
  1. 使用命令在可以处理高并发的模式下运行wiremock:java -jar <wiremock.jar> --port 5000 --container-threads 250 --jetty-acceptor-threads 200 --no-request-journal 注意:将命令中的文件名替换为下载的文件。

标签: javaperformanceapache-httpclient-4.xasynchttpclient

解决方案


如果您无法最大化 CPU 的内核,则意味着您的线程正在等待。最有可能用于 IO。当您拨打网络电话时。通常,一旦 IO 开始发挥作用,您就不太可能创建 CPU 绑定负载。要了解您的线程在做什么,请在测试运行时创建线程转储并查看它们的堆栈跟踪。

还有几件事:

  • ForkJoinPool如果您不创建旨在窃取工作的任务,最好不要使用。即ForkJoinTask。在您的情况下,简单ThreadPoolExecutor的可能会表现得更好。
  • 您可能会用并行流向自己的膝盖射击。您创建一个执行器,其中包含许多线程,每个线程都接收一个Runnable要执行的线程,该线程使用并行引擎,而并行引擎又使用单个共享的 ForkJoinPool 线程,线程数与逻辑 CPU 内核数一样多。如果您有 8 个逻辑核心,则意味着您有 100 个线程驱动任务,这些任务都共享一个 8 线程池来完成它们的工作。(虽然如果是这样的话,如果我的数学正确的话,你应该每秒只能收到大约 100 个请求。)
  • 收集每个请求的时间也可能很昂贵。我宁愿不要System.currentTimeMillis()对每个请求都打两次电话。(不过,与 http 客户端所做的联网请求的成本相比,成本很可能可以忽略不计。)

推荐阅读