首页 > 解决方案 > 同步线程池ExecutorService

问题描述

这是我第一次使用线程池,我不太明白 executorservice 是如何工作的。我将水印放在图像上并将它们合并到一张空白图片上。但即使我只使用一个线程,它仍然只会画一半。

这是我的WorkerThread 类

public class WorkerThread implements Runnable {

    BufferedImage source;
    BufferedImage toDraw;
    int x;
    int y;
    BufferedImage target;
    ParallelWatermarkFilter pf;

    public WorkerThread(BufferedImage source, BufferedImage toDraw, int x, int y, BufferedImage target){
        this.source = source;
        this.toDraw = toDraw;
        this.x = x;
        this.y = y;
        this.target = target;
        pf = new ParallelWatermarkFilter(source, 5);
    }

    @Override
    public void run() {
        pf.mergeImages(source, toDraw, x, y, target);
    }
}

这就是我在FilterClass中使用 ExecutorService 的方式:

    public BufferedImage apply(BufferedImage input) {

        ExecutorService threadpool = Executors.newFixedThreadPool(numThreads);

                for (int w = 0; w < imgWidth; w += watermarkWidth) {
      for (int h = 0; h < imgHeight; h += watermarkHeight) {
            Runnable worker = new WorkerThread(input, watermark, w, h, result);
            System.out.println("WIDTH: " + w + "   HEIGHT: " + h);
            threadpool.execute(worker);
      }
    }

    threadpool.shutdown();

线程不等到一个线程完成吗?

标签: javamultithreadingsynchronizationthreadpool

解决方案


事情ThreadPoolExecutor关闭和任务执行/排空工作队列/从工作队列中取出是活泼的。所以你不能依赖线程中断机制或其他东西。您得到的保证是:

启动有序关闭,其中执行先前提交的任务,但不会接受新任务。如果已经关闭,调用没有额外的效果。

此方法不等待先前提交的任务完成执行。

为了更深入地研究ThreadPoolExecutor实现,让我们看一下主要的执行方法:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

这里的关键部分是调用getTask(). 它的片段是:

 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
     decrementWorkerCount();
     return null;
 }

该方法不同步,仅依赖于 CASctl值提供的排序。ctl这是存储在内部的全局池状态AtomicInteger(用于非阻塞原子ThreadPoolExecutor状态获取)。

所以下面的情况是可能的。

  1. 工作线程调用getTask
  2. 工作线程获取池的运行状态。它仍然是RUNNING
  3. 另一个线程启动了订单关闭并进行了相应的修改ctl
  4. 工作线程已经从工作队列中获取了一个任务。

推荐阅读