java - executorService多线程池的性能
问题描述
我正在使用 Java 的并发库 ExecutorService 来运行我的任务。写入数据库的阈值是 200 QPS,但是这个程序在 15 个线程的情况下只能达到 20 QPS。我尝试了 5、10、20、30 个线程,它们甚至比 15 个线程还慢。这是代码:
ExecutorService executor = Executors.newFixedThreadPool(15);
List<Callable<Object>> todos = new ArrayList<>();
for (final int id : ids) {
todos.add(Executors.callable(() -> {
try {
TestObject test = testServiceClient.callRemoteService();
SaveToDatabase();
} catch (Exception ex) {}
}));
}
try {
executor.invokeAll(todos);
} catch (InterruptedException ex) {}
executor.shutdown();
1)我检查了运行这个程序的linux服务器的CPU使用率,使用率分别为90%和60%(它有4个CPU)。内存使用率仅为 20%。所以CPU和内存还是不错的。数据库服务器的 CPU 使用率很低(大约 20%)。什么可以阻止速度达到 200 QPS?也许这个服务调用:testServiceClient.callRemoteService()
?我检查了该调用的服务器配置,它允许每秒进行大量调用。
2)如果ids中的id计数超过50000,使用invokeAll是不是好主意?我们是否应该将其拆分为更小的批次,例如每批次 5000 个?
解决方案
这段代码中没有什么可以阻止这种查询率,除了重复创建和销毁线程池非常昂贵。我建议使用 Streams API,它不仅更简单,而且可以重用内置线程池
int[] ids = ....
IntStream.of(ids).parallel()
.forEach(id -> testServiceClient.callRemoteService(id));
这是使用琐碎服务的基准。主要开销是创建连接的延迟。
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(0);
Thread service = new Thread(() -> {
try {
for (; ; ) {
try (Socket s = ss.accept()) {
s.getOutputStream().write(s.getInputStream().read());
}
}
} catch (Throwable t) {
t.printStackTrace();
}
});
service.setDaemon(true);
service.start();
for (int t = 0; t < 5; t++) {
long start = System.nanoTime();
int[] ids = new int[5000];
IntStream.of(ids).parallel().forEach(id -> {
try {
Socket s = new Socket("localhost", ss.getLocalPort());
s.getOutputStream().write(id);
s.getInputStream().read();
} catch (IOException e) {
e.printStackTrace();
}
});
long time = System.nanoTime() - start;
System.out.println("Throughput " + (int) (ids.length * 1e9 / time) + " connects/sec");
}
}
印刷
Throughput 12491 connects/sec
Throughput 13138 connects/sec
Throughput 15148 connects/sec
Throughput 14602 connects/sec
Throughput 15807 connects/sec
正如@grzegorz-piwowarek 提到的那样,使用 ExecutorService 会更好。
ExecutorService es = Executors.newFixedThreadPool(8);
for (int t = 0; t < 5; t++) {
long start = System.nanoTime();
int[] ids = new int[5000];
List<Future> futures = new ArrayList<>(ids.length);
for (int id : ids) {
futures.add(es.submit(() -> {
try {
Socket s = new Socket("localhost", ss.getLocalPort());
s.getOutputStream().write(id);
s.getInputStream().read();
} catch (IOException e) {
e.printStackTrace();
}
}));
}
for (Future future : futures) {
future.get();
}
long time = System.nanoTime() - start;
System.out.println("Throughput " + (int) (ids.length * 1e9 / time) + " connects/sec");
}
es.shutdown();
在这种情况下产生几乎相同的结果。
推荐阅读
- asp.net - 嵌套内联运算符 <% 给出换行错误
- html - 使用 CSS 给我的网页一个白色的前景,灰色的背景(模仿 MS Word)
- iterm2 - iTerm2 3.3 Python API——如何创建一个 2x2 的会话网格?
- r - 在 R 中(以及安装 ROracle 包时),如何设置 OCI_LIB64?
- php - Python Xampp 错误:标头之前的脚本输出结束
- angular - 在一系列返回后完成一个 observable
- node.js - 请求超时时如何停止 Express 中的进程?
- opengl - 使用 GLFW 重新创建 OpenGL 上下文
- sql-server - SQL Server 的 xp_cmdshell 更新后速度很慢
- laravel - Composer 无法访问本地 Satis 服务器