首页 > 解决方案 > 不同机器上的java parallelStreams

问题描述

我有一个函数,它在 forEach 中使用 parallelStream 迭代列表,然后调用一个 API,并将该项目作为参数。然后我将结果存储在 hashMap 中。

    try {
            return answerList.parallelStream()
                    .map(answer -> getReplyForAnswerCombination(answer))
                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        } catch (final NullPointerException e) {
            log.error("Error in generating final results.", e);
            return null;
        }

当我在笔记本电脑 1 上运行它时,需要 1 小时。但在笔记本电脑 2 上,需要 5 个小时。

做一些基础研究后,我发现并行流使用默认的 ForkJoinPool.commonPool,默认情况下它的线程数比处理器少一个。

笔记本电脑 1 和笔记本电脑 2 具有不同的处理器。

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

标签: javaconcurrencyparallel-processingjava-stream

解决方案


项目织机

如果您希望在阻塞的线程代码(而不是 CPU 绑定代码)上获得最大性能,请使用Project Loom中提供的虚拟线程(纤程) 。基于早期访问 Java 16 的初步构建现已可用。

虚拟线程

虚拟线程可以显着加快,因为虚拟线程在被阻塞时被“停放”,被搁置一旁,因此另一个虚拟线程可以取得进展。这对于阻塞任务非常有效,线程数可以达到数百万。

放弃流的方法。只需将每个输入发送到虚拟线程。

完整的示例代码

Answer让我们为和定义类Reply,我们的输入和输出。我们将使用recordJava 16 中的一个新特性,作为定义不可变数据驱动类的缩写方式。equals编译器隐式创建构造函数、getter、 &hashCode和的默认实现toString

public record Answer (String text)
{
}

…和:

public record Reply (String text)
{
}

定义我们要提交给执行器服务的任务。我们编写了一个名为ReplierTask实现Runnable(具有run方法)的类。

在该run方法中,我们休眠当前线程以模拟等待对数据库、文件系统和/或远程服务的调用。

package work.basil.example;

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;

public class ReplierTask implements Runnable
{
    private Answer answer;
    ConcurrentMap < Answer, Reply > map;

    public ReplierTask ( Answer answer , ConcurrentMap < Answer, Reply > map )
    {
        this.answer = answer;
        this.map = map;
    }

    private Reply getReplyForAnswerCombination ( Answer answer )
    {
        // Simulating a call to some service to produce a `Reply` object.
        try { Thread.sleep( Duration.ofSeconds( 1 ) ); } catch ( InterruptedException e ) { e.printStackTrace(); }  // Simulate blocking to wait for call to service or db or such.
        return new Reply( UUID.randomUUID().toString() );
    }

    // `Runnable` interface
    @Override
    public void run ( )
    {
        System.out.println( "`run` method at " + Instant.now() + " for answer: " + this.answer );
        Reply reply = this.getReplyForAnswerCombination( this.answer );
        this.map.put( this.answer , reply );
    }
}

最后,一些代码来完成这项工作。我们创建了一个Mapper包含main方法的类。

我们通过填充对象数组来模拟一些输入Answer。我们创建一个空ConcurrentMap来收集结果。我们将每个Answer对象分配给一个新线程,在该线程中我们调用一个新Reply对象并将Answer/Reply对作为条目存储在映射中。

package work.basil.example;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Mapper
{
    public static void main ( String[] args )
    {
        System.out.println("Runtime.version(): " + Runtime.version() );
        System.out.println("availableProcessors: " + Runtime.getRuntime().availableProcessors());
        System.out.println("maxMemory: " + Runtime.getRuntime().maxMemory() + " | maxMemory/(1024*1024) -> megs: " +Runtime.getRuntime().maxMemory()/(1024*1024)  );
        Mapper app = new Mapper();
        app.demo();
    }

    private void demo ( )
    {
        // Simulate our inputs, a list of `Answer` objects.
        int limit = 10_000;
        List < Answer > answers = new ArrayList <>( limit );
        for ( int i = 0 ; i < limit ; i++ )
        {
            answers.add( new Answer( String.valueOf( i ) ) );
        }

        // Do the work.
        Instant start = Instant.now();
        System.out.println( "Starting work at: " + start + " on count of tasks: " + limit );
        ConcurrentMap < Answer, Reply > results = new ConcurrentHashMap <>();
        try
                (
                        ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
                        // Executors.newFixedThreadPool( 5 )
                        // Executors.newFixedThreadPool( 10 )
                        // Executors.newFixedThreadPool( 1_000 )
                        // Executors.newVirtualThreadExecutor()
                )
        {
            for ( Answer answer : answers )
            {
                ReplierTask task = new ReplierTask( answer , results );
                executorService.submit( task );
            }
        }
        // At this point the flow-of-control blocks until all submitted tasks are done.
        // The executor service is automatically closed by this point as well.
        Duration elapsed = Duration.between( start , Instant.now() );
        System.out.println( "results.size() = " + results.size() + ". Elapsed: " + elapsed );
    }
}

我们可以Executors.newVirtualThreadExecutor()用一个平台线程池更改它,以与虚拟线程进行比较。让我们在 Mac mini Intel 上尝试 5、10 和 1,000 个平台线程池,macOS Mojave 具有 6 个真正的内核、无超线程、32 gigs 内存和 OpenJDK 特殊构建版本 16-loom+9-316 分配的 maxMemory 8 场演出。

10,000 个任务,每个任务 1 秒 总经过时间
5个平台线程 半小时——PT33M29.755792S
10个平台线程 一刻钟 — PT16M43.318973S
1,000 个平台线程 10 秒 — PT10.487689S
10,000 个平台线程 错误...<br />无法创建本机线程:可能内存不足或已达到进程/资源限制
虚拟线程 不到 3 秒 — PT2.645964S

注意事项

警告:Project Loom 是实验性的,可能会发生变化,尚未用于生产用途。该团队现在要求人们提供反馈。

警告:像编码视频这样的 CPU 密集型任务应该坚持使用平台/内核线程而不是虚拟线程。大多数执行阻塞操作(例如 I/O)的常见代码,例如访问文件、日志记录、访问数据库或进行网络调用,都可能会通过虚拟线程获得巨大的性能提升。

警告:您必须有足够的内存供您的许多甚至所有任务同时运行。如果没有足够的内存可用,您必须采取额外的步骤来限制虚拟线程。


推荐阅读