对线程池的理解
在没有引入线程池之前,如果去创建多线程,就会出现这几种情况:第一,创建现场本身就占用CPU资源,给CPU带来压力;第二,线程本身也要占用内存空间,大量的线程会占用内存资源并且可能会导致Out of Memory。第三,线程调用结束后,大量的线程回收也会给GC带来很大的压力。第四,频繁的创建和销毁线程会降低系统的效率。
这个时候,线程池就应运而生,为了避免重复的创建线程,线程池的出现可以让线程进行复用。
ThreadPoolExecutor的核心字段
private final BlockingQueue<Runnable> workQueue:一个阻塞队列,用来存储等待执行的任务,当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能。
private final HashSet<Worker> workers:用来存储在线程池中的所有Worker的集合。
private int largestPoolSize:线程池最大可实现的线程数量。
private volatile RejectedExecutionHandler handler:表示当拒绝处理任务时的策略,saturated 或者 shutdown被执行时被调用。
private volatile long keepAliveTime:表示线程没有任务时最多保持多久然后停止。默认情况下,只有线程池中线程数大于corePoolSize 时,keepAliveTime 才会起作用。换句话说,当线程池中的线程数大于corePoolSize,并且一个线程空闲时间达到了keepAliveTime,那么就是shutdown。
private volatile int corePoolSize:线程池核心线程数量。可以理解为允许线程池中允许同时运行的最大线程数。
private volatile int maximumPoolSize:线程池最大可容线程数量。
private final ReentrantLock mainLock:线程池可重入锁。
private static final RejectedExecutionHandler defaultHandler:线程池拒绝策略。
线程池的状态以及Bit值
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
最核心的execute()方法
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. *
如果线程池中的线程数量小于corePollSize,则线程池会尝试去创建一个新的线程,并把这个command作为第一个任务来执行。
在调用addWorker方法时,会对runState和workerCount做原子性检查。如果检查失败,addWorker方法会阻止添加线程,并返回false。
* 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. *
如果一个任务成功进入阻塞队列,那么我们需要进行一个双重检查来确保是我们已经添加一个线程(因为存在着一些线程在上次检查后他已经死亡)
或者当我们进入该方法时,该线程池已经关闭。
所以,我们将重新检查状态,线程池关闭的情况下则回滚入队列,线程池没有线程的情况则创建一个新的线程。
* 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task.
如果任务无法入队列(队列满了),那么我们将尝试新开启一个线程(从corepoolsize到扩充到maximum),如果失败了,那么可以确定原因,要么是
线程池关闭了或者饱和了(达到maximum),所以我们执行拒绝策略
*/
//AtomicInteger ctl,为原子操作类,在多线程下实现安全的自增自剑,排除了多线程下的可见性和指令重排序问题。 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true))
//执行成功,则返回 return; c = ctl.get(); }
//检查线程池状态,如果是运行状态,则进入阻塞队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
//进行到这里有两种情况,1,线程池状态不是RUNNING,2,线程池workerCount >= corePoolSize并且workQueue已满,需要扩容corePoolSize到maximumPoolSize else if (!addWorker(command, false)) reject(command); }
excute()方法执行的逻辑入下图
现在看看addWorker方法的具体实现
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread#start), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { retry://外层循环,判断线程池状态 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //内层循环,主要是对worker数量加一 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //主要是把Runnable封装到Worker中,并添加到WorkerSet集合中。 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {
//启动线程 t.start(); workerStarted = true; } } } finally {
//添加失败处理 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }