首页 > 解决方案 > Java:循环等待,直到 ThreadPoolExecutor 的任务完成后再继续

问题描述

我正在努力使 Dijkstra 算法并行化。使每个节点线程查看当前节点的所有边缘。这是与线程并行的,但开销太大。这导致比算法的顺序版本更长的时间。

添加了 ThreadPool 来解决此问题,但我无法等待任务完成才能继续进行下一次迭代。只有在完成一个节点的所有任务之后,我们才应该继续。在我可以按节点搜索下一个最近的任务之前,我们需要所有任务的结果。

我试过做 executor.shutdown() 但用这种方法它不会接受新任务。我们如何在循环中等待,直到每个任务都完成,而不必每次都声明 ThreadPoolExecutor。这样做会破坏通过使用它而不是常规线程来减少开销的目的。

我想到的一件事是添加任务(边缘)的 BlockingQueue。但同样对于这个解决方案,我坚持等待任务在没有 shudown() 的情况下完成。

public void apply(int numberOfThreads) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numberOfThreads);

        class DijkstraTask implements Runnable {

            private String name;

            public DijkstraTask(String name) {
                this.name = name;
            }

            public String getName() {
                return name;
            }

            @Override
            public void run() {
                calculateShortestDistances(numberOfThreads);
            }
        }

        // Visit every node, in order of stored distance
        for (int i = 0; i < this.nodes.length; i++) {

            //Add task for each node
            for (int t = 0; t < numberOfThreads; t++) {
                executor.execute(new DijkstraTask("Task " + t));
            }

            //Wait until finished?
            while (executor.getActiveCount() > 0) {
                System.out.println("Active count: " + executor.getActiveCount());
            }

            //Look through the results of the tasks and get the next node that is closest by
            currentNode = getNodeShortestDistanced();

            //Reset the threadCounter for next iteration
            this.setCount(0);
        }
    }

边数除以线程数。所以 8 个边和 2 个线程意味着每个线程将并行处理 4 个边。

public void calculateShortestDistances(int numberOfThreads) {

        int threadCounter = this.getCount();
        this.setCount(count + 1);

        // Loop round the edges that are joined to the current node
        currentNodeEdges = this.nodes[currentNode].getEdges();

        int edgesPerThread = currentNodeEdges.size() / numberOfThreads;
        int modulo = currentNodeEdges.size() % numberOfThreads;
        this.nodes[0].setDistanceFromSource(0);
        //Process the edges per thread
        for (int joinedEdge = (edgesPerThread * threadCounter); joinedEdge < (edgesPerThread * (threadCounter + 1)); joinedEdge++) {

            System.out.println("Start: " + (edgesPerThread * threadCounter) + ". End: " + (edgesPerThread * (threadCounter + 1) + ".JoinedEdge: " + joinedEdge) + ". Total: " + currentNodeEdges.size());
            // Determine the joined edge neighbour of the current node
            int neighbourIndex = currentNodeEdges.get(joinedEdge).getNeighbourIndex(currentNode);

            // Only interested in an unvisited neighbour
            if (!this.nodes[neighbourIndex].isVisited()) {
                // Calculate the tentative distance for the neighbour
                int tentative = this.nodes[currentNode].getDistanceFromSource() + currentNodeEdges.get(joinedEdge).getLength();
                // Overwrite if the tentative distance is less than what's currently stored
                if (tentative < nodes[neighbourIndex].getDistanceFromSource()) {
                    nodes[neighbourIndex].setDistanceFromSource(tentative);
                }
            }
        }

        //if we have a modulo above 0, the last thread will process the remaining edges
        if (modulo > 0 && numberOfThreads == (threadCounter + 1)) {
            for (int joinedEdge = (edgesPerThread * threadCounter); joinedEdge < (edgesPerThread * (threadCounter) + modulo); joinedEdge++) {
                // Determine the joined edge neighbour of the current node
                int neighbourIndex = currentNodeEdges.get(joinedEdge).getNeighbourIndex(currentNode);

                // Only interested in an unvisited neighbour
                if (!this.nodes[neighbourIndex].isVisited()) {
                    // Calculate the tentative distance for the neighbour
                    int tentative = this.nodes[currentNode].getDistanceFromSource() + currentNodeEdges.get(joinedEdge).getLength();
                    // Overwrite if the tentative distance is less than what's currently stored
                    if (tentative < nodes[neighbourIndex].getDistanceFromSource()) {
                        nodes[neighbourIndex].setDistanceFromSource(tentative);
                    }
                }
            }
        }
        // All neighbours are checked so this node is now visited
        nodes[currentNode].setVisited(true);
    }

谢谢你帮助我!

标签: javamultithreadingthreadpooldijkstrathreadpoolexecutor

解决方案


您应该查看CyclicBarrierCountDownLatch. 这两种方法都允许您阻止线程启动,除非其他线程已发出信号表明它们已完成。它们之间的区别在于 CyclicBarrier可重复使用,即可以多次使用,而CountDownLatch一次性使用则不能重置计数。

从 Javadocs 解释:

CountDownLatch是一种同步辅助工具,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

CyclicBarrier是一种同步辅助工具,它允许一组线程相互等待以达到共同的屏障点。CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程组必须偶尔相互等待。屏障被称为循环的,因为它可以在等待线程被释放后重新使用。

https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CyclicBarrier.html

https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CountDownLatch.html


推荐阅读