java - 并行流 - 并行多少?
问题描述
我有一个并行流,如下Runner.java
所示。但我意识到 Runner 类中的聚合方法是按顺序运行的。我认为这是因为forEach
我有,尽管我不确定,但循环不能被打破。我对这种顺序的分析是基于我在下面类的 getter 方法中打印的线程名称。
虽然下面的代码有一个包含 1000 个对象的列表,但为了让更小的日志能够粘贴到这里,我将列表更改为 10 个项目以打印线程名称。日志在Runner.java
. 查看它们,聚合方法始终由主线程执行,而不是由CommonPool
. 对于 1000 个项目也观察到相同的行为。如果 forEach 是导致顺序执行的原因,有人可以帮助我吗?
雇员.java
public class Employee {
private double salary;
private String fName;
private String lName;
public Employee(double salary, String fName, String lName) {
this.salary = salary;
this.fName = fName;
this.lName = lName;
}
public Employee() {
}
public double getSalary() {
return salary;
}
public void setSalary(double salary) {
this.salary = salary;
}
public String getfName() {
System.out.println("Thread.currentThread() + \"for getFname\" = " + Thread.currentThread() +
" for getFname");
return fName;
}
public void setfName(String fName) {
this.fName = fName;
}
public String getlName() {
return lName;
}
public void setlName(String lName) {
this.lName = lName;
}
@Override
public String toString() {
return "E{" +
"s=" + salary +
", N='" + fName + '\'' +
'}';
}
}
跑者类:
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class Runner {
public static void main(String[] args) {
ArrayList<Employee> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add(new Employee(i * 100, "A" + i, "Z" + i));
}
long l = System.currentTimeMillis();
List<Employee> collect = list
.parallelStream()
.collect(Collectors.collectingAndThen(Collectors.groupingBy(Employee::getfName), Runner::aggregate));
System.out.println("collect = " + collect.size());
System.out.println(collect.stream().mapToDouble(e -> e.getSalary()).sum());
}
private static List<Employee> aggregate(Map<String, List<Employee>> t) {
ArrayList<Employee> emps = new ArrayList<>();
t.entrySet()
.forEach(e -> {
System.out.println("Thread.currentThread() + \"within agrgate\" = " + Thread.currentThread() + "within agrgate");
Employee e1 = new Employee();
e.getValue().forEach(r -> {
e1.setSalary(e1.getSalary() + r.getSalary());
e1.setfName(r.getfName());
});
emps.add(e1);
});
return emps;
}
}
日志:
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-1,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-3,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-2,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-3,5,main]for getFname
Thread.currentThread() + "for getFname" = Thread[ForkJoinPool.commonPool-worker-1,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
Thread.currentThread() + "within agrgate" = Thread[main,5,main]within agrgate
Thread.currentThread() + "for getFname" = Thread[main,5,main]for getFname
解决方案
for each方法来自接口可迭代,其中“操作按迭代顺序执行......”
然后,您可以为每个使用 Stream。
t.entrySet().parallelStream().forEach( e->{
推荐阅读
- javascript - 包括节点模块以加载本地 CSV 文件并运行 wink-naive-bayes-text-classifier
- json - 如何在 protobuf 或 grpc 中使用 json 作为结构成员?
- sharepoint - 无法在自定义列表中查看修改者的详细信息
- spring-boot - spring-cloud 在解析属性 placeHolder 之前加载数据源 bean
- oracle - 在 Oracle PL/SQL 中使用触发器改变触发器错误
- javascript - 固定定位溢出不滚动
- c# - 检测是否在 C# 中插入了 MySQL 行
- android - Android:程序类型已存在:com.google.zxing.BarcodeFormat
- python - 使用opencv检测网格的角
- amazon-web-services - 无法使用 ec2 + 应用程序负载均衡器获得 websocket 连接