java - 不同机器上的java parallelStreams
问题描述
我有一个函数,它在 forEach 中使用 parallelStream 迭代列表,然后调用一个 API,并将该项目作为参数。然后我将结果存储在 hashMap 中。
try {
return answerList.parallelStream()
.map(answer -> getReplyForAnswerCombination(answer))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} catch (final NullPointerException e) {
log.error("Error in generating final results.", e);
return null;
}
当我在笔记本电脑 1 上运行它时,需要 1 小时。但在笔记本电脑 2 上,需要 5 个小时。
做一些基础研究后,我发现并行流使用默认的 ForkJoinPool.commonPool,默认情况下它的线程数比处理器少一个。
笔记本电脑 1 和笔记本电脑 2 具有不同的处理器。
- 有没有办法找出可以在笔记本电脑 1 和笔记本电脑 2 上并行运行的流数?
- 我可以使用此处给出的建议安全地增加笔记本电脑2 中的并行流数量吗?
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ");
});
解决方案
项目织机
如果您希望在阻塞的线程代码(而不是 CPU 绑定代码)上获得最大性能,请使用Project Loom中提供的虚拟线程(纤程) 。基于早期访问 Java 16 的初步构建现已可用。
虚拟线程
虚拟线程可以显着加快,因为虚拟线程在被阻塞时被“停放”,被搁置一旁,因此另一个虚拟线程可以取得进展。这对于阻塞任务非常有效,线程数可以达到数百万。
放弃流的方法。只需将每个输入发送到虚拟线程。
完整的示例代码
Answer
让我们为和定义类Reply
,我们的输入和输出。我们将使用record
Java 16 中的一个新特性,作为定义不可变数据驱动类的缩写方式。equals
编译器隐式创建构造函数、getter、 &hashCode
和的默认实现toString
。
public record Answer (String text)
{
}
…和:
public record Reply (String text)
{
}
定义我们要提交给执行器服务的任务。我们编写了一个名为ReplierTask
实现Runnable
(具有run
方法)的类。
在该run
方法中,我们休眠当前线程以模拟等待对数据库、文件系统和/或远程服务的调用。
package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
public class ReplierTask implements Runnable
{
private Answer answer;
ConcurrentMap < Answer, Reply > map;
public ReplierTask ( Answer answer , ConcurrentMap < Answer, Reply > map )
{
this.answer = answer;
this.map = map;
}
private Reply getReplyForAnswerCombination ( Answer answer )
{
// Simulating a call to some service to produce a `Reply` object.
try { Thread.sleep( Duration.ofSeconds( 1 ) ); } catch ( InterruptedException e ) { e.printStackTrace(); } // Simulate blocking to wait for call to service or db or such.
return new Reply( UUID.randomUUID().toString() );
}
// `Runnable` interface
@Override
public void run ( )
{
System.out.println( "`run` method at " + Instant.now() + " for answer: " + this.answer );
Reply reply = this.getReplyForAnswerCombination( this.answer );
this.map.put( this.answer , reply );
}
}
最后,一些代码来完成这项工作。我们创建了一个Mapper
包含main
方法的类。
我们通过填充对象数组来模拟一些输入Answer
。我们创建一个空ConcurrentMap
来收集结果。我们将每个Answer
对象分配给一个新线程,在该线程中我们调用一个新Reply
对象并将Answer
/Reply
对作为条目存储在映射中。
package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Mapper
{
public static void main ( String[] args )
{
System.out.println("Runtime.version(): " + Runtime.version() );
System.out.println("availableProcessors: " + Runtime.getRuntime().availableProcessors());
System.out.println("maxMemory: " + Runtime.getRuntime().maxMemory() + " | maxMemory/(1024*1024) -> megs: " +Runtime.getRuntime().maxMemory()/(1024*1024) );
Mapper app = new Mapper();
app.demo();
}
private void demo ( )
{
// Simulate our inputs, a list of `Answer` objects.
int limit = 10_000;
List < Answer > answers = new ArrayList <>( limit );
for ( int i = 0 ; i < limit ; i++ )
{
answers.add( new Answer( String.valueOf( i ) ) );
}
// Do the work.
Instant start = Instant.now();
System.out.println( "Starting work at: " + start + " on count of tasks: " + limit );
ConcurrentMap < Answer, Reply > results = new ConcurrentHashMap <>();
try
(
ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
// Executors.newFixedThreadPool( 5 )
// Executors.newFixedThreadPool( 10 )
// Executors.newFixedThreadPool( 1_000 )
// Executors.newVirtualThreadExecutor()
)
{
for ( Answer answer : answers )
{
ReplierTask task = new ReplierTask( answer , results );
executorService.submit( task );
}
}
// At this point the flow-of-control blocks until all submitted tasks are done.
// The executor service is automatically closed by this point as well.
Duration elapsed = Duration.between( start , Instant.now() );
System.out.println( "results.size() = " + results.size() + ". Elapsed: " + elapsed );
}
}
我们可以Executors.newVirtualThreadExecutor()
用一个平台线程池更改它,以与虚拟线程进行比较。让我们在 Mac mini Intel 上尝试 5、10 和 1,000 个平台线程池,macOS Mojave 具有 6 个真正的内核、无超线程、32 gigs 内存和 OpenJDK 特殊构建版本 16-loom+9-316 分配的 maxMemory 8 场演出。
10,000 个任务,每个任务 1 秒 | 总经过时间 |
---|---|
5个平台线程 | 半小时——PT33M29.755792S |
10个平台线程 | 一刻钟 — PT16M43.318973S |
1,000 个平台线程 | 10 秒 — PT10.487689S |
10,000 个平台线程 | 错误...<br />无法创建本机线程:可能内存不足或已达到进程/资源限制 |
虚拟线程 | 不到 3 秒 — PT2.645964S |
注意事项
警告:Project Loom 是实验性的,可能会发生变化,尚未用于生产用途。该团队现在要求人们提供反馈。
警告:像编码视频这样的 CPU 密集型任务应该坚持使用平台/内核线程而不是虚拟线程。大多数执行阻塞操作(例如 I/O)的常见代码,例如访问文件、日志记录、访问数据库或进行网络调用,都可能会通过虚拟线程获得巨大的性能提升。
警告:您必须有足够的内存供您的许多甚至所有任务同时运行。如果没有足够的内存可用,您必须采取额外的步骤来限制虚拟线程。
推荐阅读
- javascript - 如何在本机反应中根据平面列表中的列呈现数据
- mule4 - 如何在 Mule 中查找 IBM MQ 中的消息数
- node.js - Angular:如何从服务内的事件中获取数据到我的组件?
- python - 如何在提交作业的远程服务器上运行 jupyter 笔记本?
- python - tf.Graph() 中许多模型的缓慢处理
- javascript - Angular Multi owl-carousel-o
- android - 使用 Android PorterDuff 模式时不绘制整数 alpha
- python - 如何过滤掉差异小于阈值的行?
- nginx - 如何使用 nginx 屏蔽子域
- ruby-on-rails - 检索所有记录时的替代“全部”方法