java - Java:循环等待,直到 ThreadPoolExecutor 的任务完成后再继续
问题描述
我正在努力使 Dijkstra 算法并行化。使每个节点线程查看当前节点的所有边缘。这是与线程并行的,但开销太大。这导致比算法的顺序版本更长的时间。
添加了 ThreadPool 来解决此问题,但我无法等待任务完成才能继续进行下一次迭代。只有在完成一个节点的所有任务之后,我们才应该继续。在我可以按节点搜索下一个最近的任务之前,我们需要所有任务的结果。
我试过做 executor.shutdown() 但用这种方法它不会接受新任务。我们如何在循环中等待,直到每个任务都完成,而不必每次都声明 ThreadPoolExecutor。这样做会破坏通过使用它而不是常规线程来减少开销的目的。
我想到的一件事是添加任务(边缘)的 BlockingQueue。但同样对于这个解决方案,我坚持等待任务在没有 shudown() 的情况下完成。
public void apply(int numberOfThreads) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numberOfThreads);
class DijkstraTask implements Runnable {
private String name;
public DijkstraTask(String name) {
this.name = name;
}
public String getName() {
return name;
}
@Override
public void run() {
calculateShortestDistances(numberOfThreads);
}
}
// Visit every node, in order of stored distance
for (int i = 0; i < this.nodes.length; i++) {
//Add task for each node
for (int t = 0; t < numberOfThreads; t++) {
executor.execute(new DijkstraTask("Task " + t));
}
//Wait until finished?
while (executor.getActiveCount() > 0) {
System.out.println("Active count: " + executor.getActiveCount());
}
//Look through the results of the tasks and get the next node that is closest by
currentNode = getNodeShortestDistanced();
//Reset the threadCounter for next iteration
this.setCount(0);
}
}
边数除以线程数。所以 8 个边和 2 个线程意味着每个线程将并行处理 4 个边。
public void calculateShortestDistances(int numberOfThreads) {
int threadCounter = this.getCount();
this.setCount(count + 1);
// Loop round the edges that are joined to the current node
currentNodeEdges = this.nodes[currentNode].getEdges();
int edgesPerThread = currentNodeEdges.size() / numberOfThreads;
int modulo = currentNodeEdges.size() % numberOfThreads;
this.nodes[0].setDistanceFromSource(0);
//Process the edges per thread
for (int joinedEdge = (edgesPerThread * threadCounter); joinedEdge < (edgesPerThread * (threadCounter + 1)); joinedEdge++) {
System.out.println("Start: " + (edgesPerThread * threadCounter) + ". End: " + (edgesPerThread * (threadCounter + 1) + ".JoinedEdge: " + joinedEdge) + ". Total: " + currentNodeEdges.size());
// Determine the joined edge neighbour of the current node
int neighbourIndex = currentNodeEdges.get(joinedEdge).getNeighbourIndex(currentNode);
// Only interested in an unvisited neighbour
if (!this.nodes[neighbourIndex].isVisited()) {
// Calculate the tentative distance for the neighbour
int tentative = this.nodes[currentNode].getDistanceFromSource() + currentNodeEdges.get(joinedEdge).getLength();
// Overwrite if the tentative distance is less than what's currently stored
if (tentative < nodes[neighbourIndex].getDistanceFromSource()) {
nodes[neighbourIndex].setDistanceFromSource(tentative);
}
}
}
//if we have a modulo above 0, the last thread will process the remaining edges
if (modulo > 0 && numberOfThreads == (threadCounter + 1)) {
for (int joinedEdge = (edgesPerThread * threadCounter); joinedEdge < (edgesPerThread * (threadCounter) + modulo); joinedEdge++) {
// Determine the joined edge neighbour of the current node
int neighbourIndex = currentNodeEdges.get(joinedEdge).getNeighbourIndex(currentNode);
// Only interested in an unvisited neighbour
if (!this.nodes[neighbourIndex].isVisited()) {
// Calculate the tentative distance for the neighbour
int tentative = this.nodes[currentNode].getDistanceFromSource() + currentNodeEdges.get(joinedEdge).getLength();
// Overwrite if the tentative distance is less than what's currently stored
if (tentative < nodes[neighbourIndex].getDistanceFromSource()) {
nodes[neighbourIndex].setDistanceFromSource(tentative);
}
}
}
}
// All neighbours are checked so this node is now visited
nodes[currentNode].setVisited(true);
}
谢谢你帮助我!
解决方案
您应该查看CyclicBarrier
或CountDownLatch
. 这两种方法都允许您阻止线程启动,除非其他线程已发出信号表明它们已完成。它们之间的区别在于 CyclicBarrier
可重复使用,即可以多次使用,而CountDownLatch
一次性使用则不能重置计数。
从 Javadocs 解释:
CountDownLatch是一种同步辅助工具,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
CyclicBarrier是一种同步辅助工具,它允许一组线程相互等待以达到共同的屏障点。CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程组必须偶尔相互等待。屏障被称为循环的,因为它可以在等待线程被释放后重新使用。
https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CyclicBarrier.html
推荐阅读
- javascript - 响应式图像地图推出重新定位
- python - 根据列表整数值返回字典键
- php - laravel 在 npm 运行 watch 命令后显示 menifest.js 的未定义索引
- ios - 快速更新已编辑文本视图的问题
- python - 如何在 Python 中制作基于字符串的多维数组
- flask - 您是否需要为 OpenID Grants 客户端生成自己的 nonce 值?
- node.js - 如何判断示例中给定字符串上使用的加密?
- java - 我的 sqlite 数据库是什么时候创建的?
- c# - 第一次按下一个键并按住它后,有没有办法摆脱反应的短暂停顿?
- c - 生成没有 time.h 的随机值