首页 > 解决方案 > 如何知道 ExecutorService 何时完成,ES 上的项目是否可以重新提交给 ES

问题描述

我的 Java 应用程序适用于文件夹中的音乐文件,它旨在并行和独立地处理多个文件夹。为此,每个文件夹都由 ExecutorService 处理,该 ExecutorService 的最大池大小与计算机的 CPU 数量不匹配。

例如,如果我们有 8 个 CPU 的计算机,那么(理论上)可以同时处理 8 个文件夹,如果我们有 16 个 CPU 的计算机,那么可以同时处理 16 个文件夹。如果我们只有 1 个 CPU,那么我们将 pool-size 设置为 3,以允许 CPU 在一个文件夹在 I/O 上阻塞时继续执行某些操作。

但是,我们实际上并没有只有一个 ExecutorService 我们有多个,因为每个文件夹都可以经历多个阶段。

Process1(使用ExecutorService1)→Process2(ExecutorService2)→Process3(ExecutorService3)

进程 1、2、3 等都实现了 Callable,并且都有自己关联的 ExecutorService。我们启动了一个 FileLoader 进程,它加载文件夹,然后为每个文件夹创建一个 Process1 可调用对象并提交给 Process1 执行程序,对于每个 Process1 可调用对象,它将完成其工作,然后提交给不同的可调用对象,这可能是 Process2、Process3等等,但我们永远不会倒退,例如 Process3 永远不会提交给 Process1。我们实际上有12 个进程,但任何特定文件夹都不可能通过所有 12 个进程

但我意识到这是有缺陷的,因为在 16 CPU 计算机的情况下,每个 ES 可以有 16 个池大小,所以我们实际上有 48 个线程在运行,这只会导致太多的争用。

所以我要做的是让所有进程(Process1、Process2…)使用相同的 ExecutorService,这样我们就只会匹配 CPU 的工作线程。

但是,在我目前的情况下,我们有一个 SongLoader 进程,它只提交了一个任务(加载所有文件夹),然后我们调用了 shutdown(),直到所有内容都提交给 Process0,然后关闭() on在将所有内容发送到 Process1 之前,Process0 不会成功,依此类推。

 //Init Services
 services.add(songLoaderService);
 services.add(Process1.getExecutorService());
 services.add(Process2.getExecutorService());
 services.add(Process3.getExecutorService());

 for (ExecutorService service : services)
     //Request Shutdown
     service.shutdown();

     //Now wait for all submitted tasks to complete
     service.awaitTermination(10, TimeUnit.DAYS);
 }
 //...............
 //Finish Off work

但是,如果所有内容都在同一个 ES 上并且 Process1 正在提交给 Process2,这将不再起作用,因为当时调用了 shutdown() 并不是 Process1 将提交给 Process2 的所有文件夹,因此它会过早关闭。

那么,当该 ES 上的任务可以提交给同一 ES 上的其他任务时,如何使用单个 ExecutorService 检测所有工作何时完成?

还是有更好的方法?

注意,你可能会想他为什么不直接将 Process1,2 & 3 的逻辑合并到一个 Process 中。困难在于,虽然我最初按文件夹对歌曲进行分组,但有时歌曲被分成更小的组,它们被分配到不同的进程中,而不是同一个进程,实际上总共有 12 个进程。

基于 Sholms 思想的尝试

主线程

    private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
    private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
    ...
    SongLoader loader = SongLoader.getInstanceOf(parentFolder);
    ExecutorService songLoaderService =  SongLoader.getExecutorService();
    songLoaderService.submit(loader);
    for(Future future : futures)
    {
        try
        {
             future.get();
        }
        catch (InterruptedException ie)
        {
            SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
            getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
        }
        catch(ExecutionException e)
        {
            SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
        }
    }
    songLoaderService.shutdown();

使用MainAnalyserService中的此函数提交新任务的流程代码

public void submit(Callable<Boolean> task) //throws Exception
{
    FixSongsController.getFutures().add(getExecutorService().submit(task));
}

看起来它正在工作,但它失败了

java.util.ConcurrentModificationException
    at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
    at java.base/java.util.ArrayList$Itr.next(Unknown Source)
    at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
    at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

而且我现在释放我无法调用future.get() 的一个线程(它一直等到完成),而同时其他线程正在添加到列表中。

标签: javaexecutorservicejava.util.concurrentexecutor

解决方案


我同意 Shloim 的观点,即您在这里不需要多个ExecutorService实例——只需一个(大小取决于您可用的 CPU 数量)就足够了,而且实际上是最佳的。实际上,我认为您可能不需要ExecutorService; Executor如果您使用信号完整性的外部机制,一个简单的就可以完成这项工作。

我将首先构建一个类来代表一个更大的工作项的整体。如果您需要使用每个子工作项的结果,您可以使用队列,但如果您只想知道是否还有工作要做,您只需要一个计数器。

例如,您可以执行以下操作:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private int pendingItems;  // guarded by monitor lock on this instance

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public synchronized void enqueueMoreWork(File file) {
        pendingItems++;
        executor.execute(new FileWork(file, this));
    }

    public synchronized void markWorkItemCompleted() {
        pendingItems--;
        notifyAll();
    }

    public synchronized boolean hasPendingWork() {
        return pendingItems > 0;
    }

    public synchronized void awaitCompletion() {
       while (pendingItems > 0) {
           wait();
       }
    }
}

public class FileWork implements Runnable {
    private final File file;
    private final FolderWork parent;

    public FileWork(File file, FolderWork parent) {
        this.file = file;
        this.parent = parent;
    }

    @Override
    public void run() {
        try {
           // do some work with the file

           if (/* found more work to do */) {
               parent.enqueueMoreWork(...);
           }
        } finally {
            parent.markWorkItemCompleted();
        }
    }
}

如果您担心pendingItems计数器的同步开销,可以使用AtomicIntegerfor 代替。然后你需要一个单独的机制来通知等待线程我们完成了;例如,您可以使用CountDownLatch. 这是一个示例实现:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public void enqueueMoreWork(File file) {
        if (latch.getCount() == 0) {
            throw new IllegalStateException(
                "Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
        }
        pendingItems.incrementAndGet();
        executor.execute(new FileWork(file, this));
    }

    public void markWorkItemCompleted() {
        int remainingItems = pendingItems.decrementAndGet();
        if (remainingItems == 0) {
            latch.countDown();
        }
    }

    public boolean hasPendingWork() {
        return pendingItems.get() > 0;
    }

    public void awaitCompletion() {
       latch.await();
    }
}

你会这样称呼它:

Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();

此示例仅显示一级子工作项,但您可以使用任意数量的子工作项,只要它们都使用相同的pendingItems计数器来跟踪还有多少工作要做。


推荐阅读