首页 > 解决方案 > 使用 ThreadPool 的 Phaser 未正确到达并等待。我怎样才能解决这个问题?

问题描述

可以在此处找到有关如何使用 Phaser 和遇到的问题的基本概念的 repo:https ://github.com/hipy/phaser/tree/master/src

我一直致力于使用 Phaser 使用 ThreadPools 使 Dijkstra 算法更高效。我用一个循环进行了很多迭代,每次迭代都需要一个 Phaser 来等待 ThreadPool 中的线程完成,然后再继续当前的迭代。

我遇到了一个问题,Phaser 没有正确等待。当我使用 ArriveAndDeregister() 时,Phaser 在每个线程完成后进入终止状态。当我调用 Arrive() 时,未到达方的数量不会减少,因此迭代会卡住。

下面的所有代码都在一个调用一次的 apply() 方法中运行。

下面的代码为要执行的线程创建任务。

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool((numberOfThreads));
        Phaser phaser = new Phaser();

        //Thread class
        class ClosestNodeTask implements Runnable {

            private int start;
            private int end;
            private Phaser phaser;

            public ClosestNodeTask(int start, int end, Phaser phaser) {
                this.start = start;
                this.end = end;
                this.phaser = phaser;
            }

            @Override
            public void run() {
                getNodeShortestDistanced(start, end, phaser); //method calls phaser.arrive() when done
            }
        }

        for (int t = 0; t < numberOfThreads; t++) {
            if (nodesModulo > 0 && numberOfThreads == (t + 1)) {
                start = nodesPerThread * (t);
                end = nodesPerThread * (t + 1) + nodesModulo;
              tasks[t] = new ClosestNodeTask(start, end, phaser);
            } else {
                start = nodesPerThread * t;
                end = nodesPerThread * (t + 1);
                tasks[t] = new ClosestNodeTask(start, end, phaser);
            }
        }

下面的代码在 for 循环中为每次迭代执行。在这种情况下,有 30.000 次迭代:

     phaser.register(); //register main thread
     for(int t = 0; t < tasks.length; t++) {
        phaser.register();
     }
    System.out.println("Phaser unarrived party size is now: " + phaser.getUnarrivedParties());    

跳过一些算法代码,在 for 循环中执行以下代码,启动线程并等待它完成:

  for(int t = 0; t < tasks.length; t++) {
      executor.execute(tasks[t]);
  }

 phaser.arriveAndAwaitAdvance();

输出如下:

Phasecount: 0
Phaser unarrived party size is now: 3
Task size: 2
Adding: 613 //Next closest node in a sub-group, result of work done in a thread
Adding: 2870
all tasks done
-----------------done-------------
Phasecount: 1
Phaser unarrived party size is now: 6
Task size: 2
Adding: 1
Adding: 2870

第一阶段执行,进行完整的迭代。第二阶段卡住了。未到达的派对数量为 6。3 个新派对和显然 3 个旧派对,即使我调用了 phaser.arrive() 也没有注册为到达。ArriveAndAwaitAdvance() 也没有等待,因为在下一次迭代中有 6 个未到达方而不是 3 个。

我尝试使用到达AndDeregister() 但这会导致阶段终止(阶段计数有一个很大的负值)。

我怎么能解决这个问题?我不想终止一个阶段,但我确实想在每次迭代时将各方注册为到达。

谢谢!

标签: javaconcurrencythreadpooldijkstraphaser

解决方案


推荐阅读