首页 > 技术文章 > Executor

gqymy 2019-01-22 13:39 原文

 Executor是一个简单的标准化接口,用于定义自定义线程类子系统,包括线程池,异步I / O和轻量级任务框架。根据正在使用的具体Executor类,任务可以在新创建的线程,现有任务执行线程或线程调用中execute执行,并且可以顺序执行或同时执行。 ExecutorService提供了更完整的异步任务执行框架。ExecutorService管理任务的排队和调度,并允许受控关闭。该ScheduledExecutorService 子接口及相关的接口添加了延迟的和定期任务执行的支持。ExecutorServices提供了安排任何函数的异步执行的方法,表示为Callable结果的模拟Runnable。A Future返回函数的结果,允许确定执行是否已完成,并提供取消执行的方法。A RunnableFuture是Future 拥有一种run方法,在执行时设置其结果。
class DirectExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}
class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}
 
Executor接口的execute是在ThreadPoolExecutor中实现的:
 
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
    }
     else if (!addWorker(command, false))
    reject(command);
}
 
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl它表示了两个概念:
workerCount:当前有效的线程数
runState:当前线程池的五种状态,Running、Shutdown、Stop、Tidying、Terminate。
 
ctl.get()方法是在AtomicInteger类中的
 
public final int get() {
    return value;
}
 
 
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
         //由它可以获取到当前有效的线程数和线程池的状态
        int c = ctl.get();
        int rs = runStateOf(c);
 
        // 检查队列是否为空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
            firstTask == null &&
            ! workQueue.isEmpty()))
            return false;
 
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
// 首先会再次检查线程池是否处于运行状态,核心线程池中是否还有空闲线程,都满足条件过后则会调用compareAndIncrementWorkerCount先将正在运行的线程数+1,数量自增成功则跳出循环,自增失败则继续从头继续循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get(); // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
//  否则CAS由于workerCount更改而失败;重试内循环
        }
    }
 
// 正在运行的线程数自增成功后则将线程封装成工作线程Worker
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
      //全局锁
            final ReentrantLock mainLock = this.mainLock;
      //获取全局锁
            mainLock.lock();
            // 当持有了全局锁的时候,还需要再次检查线程池的运行状态等
            try {
                 //持锁时重新检查。
                //退出线程工厂失败或如果
                //获取锁之前关闭。
                int rs = runStateOf(ctl.get());     //线程池运行状态
 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())          //线程处于活跃状态,即线程已经开始执行或者还未死亡,正确的应线程在这里应该是还未开始执行的
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();     //工作线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;         //新构造的工作线程加入成功
                 }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();         //在被构造为Worker工作线程,且被加入到工作线程集合中后,执行线程任务,注意这里的start实际上执行Worker中run方法,所以接下来分析Worker的run方法
                workerStarted = true;
            }
        }
    } finally {
    //未能成功创建执行工作线程
        if (! workerStarted)
            addWorkerFailed(w);     //在启动工作线程失败后,将工作线程从集合中移除
    }
    return workerStarted;
}
 

推荐阅读