首页 > 解决方案 > 并行流 - 并行多少?

问题描述

我有一个并行流,如下Runner.java所示。但我意识到 Runner 类中的聚合方法是按顺序运行的。我认为这是因为forEach我有,尽管我不确定,但循环不能被打破。我对这种顺序的分析是基于我在下面类的 getter 方法中打印的线程名称。

虽然下面的代码有一个包含 1000 个对象的列表,但为了让更小的日志能够粘贴到这里,我将列表更改为 10 个项目以打印线程名称。日志在Runner.java. 查看它们,聚合方法始终由主线程执行,而不是由CommonPool. 对于 1000 个项目也观察到相同的行为。如果 forEach 是导致顺序执行的原因,有人可以帮助我吗?

雇员.java

public class Employee {
   private double salary;
   private String fName;
   private String lName;

   public Employee(double salary, String fName, String lName) {
      this.salary = salary;
      this.fName = fName;
      this.lName = lName;
   } 

   public Employee() {
   }

   public double getSalary() {
      return salary;
   }

   public void setSalary(double salary) {
     this.salary = salary;
   }

   public String getfName() {
      System.out.println("Thread.currentThread() + \"for getFname\" = " + Thread.currentThread() + 
     " for getFname");
      return fName;
   }

   public void setfName(String fName) {
      this.fName = fName;
   }

   public String getlName() {
       return lName;
   }

   public void setlName(String lName) {
       this.lName = lName;
   }

   @Override
   public String toString() {
       return "E{" +
            "s=" + salary +
            ", N='" + fName + '\'' +
            '}';
    }
  }

跑者类:

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class Runner {
    public static void main(String[] args) {
        ArrayList<Employee> list = new ArrayList<>();

        for (int i = 0; i < 1000; i++) {
            list.add(new Employee(i * 100, "A" + i, "Z" + i));

        }
        long l = System.currentTimeMillis();
        List<Employee> collect = list
                .parallelStream()
                .collect(Collectors.collectingAndThen(Collectors.groupingBy(Employee::getfName), Runner::aggregate));
        System.out.println("collect = " + collect.size());
        System.out.println(collect.stream().mapToDouble(e -> e.getSalary()).sum());
    }

    private static List<Employee> aggregate(Map<String, List<Employee>> t) {
        ArrayList<Employee> emps = new ArrayList<>();
        t.entrySet()
                .forEach(e -> {
                    System.out.println("Thread.currentThread() + \"within agrgate\" = " + Thread.currentThread() + "within agrgate");
                    Employee e1 = new Employee();
                    e.getValue().forEach(r -> {
                        e1.setSalary(e1.getSalary() + r.getSalary());
                        e1.setfName(r.getfName());
                    });
                    emps.add(e1);
                });
        return emps;
    }
}

日志:

Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-1,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-3,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-2,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-3,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-1,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname

标签: javajava-stream

解决方案


for each方法来自接口可迭代,其中“操作按迭代顺序执行......”

然后,您可以为每个使用 Stream。

t.entrySet().parallelStream().forEach( e->{ 

推荐阅读