首页 > 解决方案 > executorService多线程池的性能

问题描述

我正在使用 Java 的并发库 ExecutorService 来运行我的任务。写入数据库的阈值是 200 QPS,但是这个程序在 15 个线程的情况下只能达到 20 QPS。我尝试了 5、10、20、30 个线程,它们甚至比 15 个线程还慢。这是代码:

ExecutorService executor = Executors.newFixedThreadPool(15);
List<Callable<Object>> todos = new ArrayList<>();

for (final int id : ids) {
    todos.add(Executors.callable(() -> {
        try {
            TestObject test = testServiceClient.callRemoteService();
    SaveToDatabase();
        } catch (Exception ex) {}
    }));
}
try {
    executor.invokeAll(todos);
} catch (InterruptedException ex) {} 
executor.shutdown();

1)我检查了运行这个程序的linux服务器的CPU使用率,使用率分别为90%和60%(它有4个CPU)。内存使用率仅为 20%。所以CPU和内存还是不错的。数据库服务器的 CPU 使用率很低(大约 20%)。什么可以阻止速度达到 200 QPS?也许这个服务调用:testServiceClient.callRemoteService()?我检查了该调用的服务器配置,它允许每秒进行大量调用。

2)如果ids中的id计数超过50000,使用invokeAll是不是好主意?我们是否应该将其拆分为更小的批次,例如每批次 5000 个?

标签: javamultithreadingexecutorservice

解决方案


这段代码中没有什么可以阻止这种查询率,除了重复创建和销毁线程池非常昂贵。我建议使用 Streams API,它不仅更简单,而且可以重用内置线程池

int[] ids = ....
IntStream.of(ids).parallel()
                 .forEach(id -> testServiceClient.callRemoteService(id));

这是使用琐碎服务的基准。主要开销是创建连接的延迟。

public static void main(String[] args) throws IOException {
    ServerSocket ss = new ServerSocket(0);
    Thread service = new Thread(() -> {
        try {
            for (; ; ) {
                try (Socket s = ss.accept()) {
                    s.getOutputStream().write(s.getInputStream().read());
                }
            }
        } catch (Throwable t) {
            t.printStackTrace();
        }
    });
    service.setDaemon(true);
    service.start();

    for (int t = 0; t < 5; t++) {
        long start = System.nanoTime();
        int[] ids = new int[5000];
        IntStream.of(ids).parallel().forEach(id -> {
            try {
                Socket s = new Socket("localhost", ss.getLocalPort());
                s.getOutputStream().write(id);
                s.getInputStream().read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        long time = System.nanoTime() - start;
        System.out.println("Throughput " + (int) (ids.length * 1e9 / time) + " connects/sec");
    }
}

印刷

Throughput 12491 connects/sec
Throughput 13138 connects/sec
Throughput 15148 connects/sec
Throughput 14602 connects/sec
Throughput 15807 connects/sec

正如@grzegorz-piwowarek 提到的那样,使用 ExecutorService 会更好。

    ExecutorService es = Executors.newFixedThreadPool(8);
    for (int t = 0; t < 5; t++) {
        long start = System.nanoTime();
        int[] ids = new int[5000];
        List<Future> futures = new ArrayList<>(ids.length);
        for (int id : ids) {
            futures.add(es.submit(() -> {
                try {
                    Socket s = new Socket("localhost", ss.getLocalPort());
                    s.getOutputStream().write(id);
                    s.getInputStream().read();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }));
        }
        for (Future future : futures) {
            future.get();
        }
        long time = System.nanoTime() - start;
        System.out.println("Throughput " + (int) (ids.length * 1e9 / time) + " connects/sec");
    }
    es.shutdown();

在这种情况下产生几乎相同的结果。


推荐阅读