首页 > 技术文章 > 线程池

chenglc 2020-08-01 13:59 原文

java线程池

合理使用线程池的好处

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
  2. 提高执行速度。当任务到达时,任务可以不需要等待线程创建就能立即执行;
  3. 提高线程的可管理性。线程是系统重要资源,线程池可以控制线程数量,避免独自创建线程,引发系统问题。

核心类-ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。

ThreadPoolExecutor类的具体实现源码:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器。

核心构造器为

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

主要参数:

  • corePoolSize:核心池的大小
  • maximumPoolSize:线程池最大线程数
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性
  • workQueue:一个阻塞队列,用来存储等待执行的任务
  • threadFactory:线程工厂,主要用来创建线程,设置线程名
  • handler:表示当拒绝处理任务时的策略

unit的值

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

workQueue的值

ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

handler的值-任务饱和处理策略

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 

任务运行逻辑

ThreadPoolExecutor执行execute()方法来提交任务;

线程池节点到任务后有四种处理场景:

  1. 判断核心线程池是否有空闲线程。如果有空闲线程,直接让任务搭车运行;如果核心线程已满,进入下一个流程;
  2. 判断任务队列是否已满。如果没有满,就进入队列,否则进入下一个流程;
  3. 判断整个线程池的线程数是否达到上限(BlockingQueue已满)。如果没有就开启一个新的线程(需要获取全局锁),否则(危险状态)交给饱和策略来处理;
  4. RejectedExecutionHandler.rejectedExecution()方法来处理多出来的任务。

以上步骤的的设计思路是为了避免使用全局锁。

image

ThreadPoolExecutor执行execute()的流程

image

源码分析

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
     // 获取运行的核心线程数
    int c = ctl.get();
    //如果运行的核心线程数小于和核心线程数,就加入核心线程执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 如果核线程池处于正常运行,并且可以加入等待队列(队列未满)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //再次检查线程池状态,如果线程池处于非运行状态(死掉了,或者线程进入方法后就关闭了  (because existing ones died since last checking) or that  the pool shut down since entry into this method)
        //就执行回滚策略(移除队列元素),然后  执行任务淘汰策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
            // 如果当前空闲线程为0,则创建建一个新的线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果添加任务失败,则执行淘汰策略
    else if (!addWorker(command, false))
        reject(command);
}

创建线程时加锁源码

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}

任务执行:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //循环获取队列任务,如果不为空,就执行任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

工作线程:线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。

每个线程执行流程:

  • 创建线程时,执行拥有的任务,然后取队列中的任务执行。

image

配置线程池的参数

根据任务特性来分析:

  • cpu密集型
  • IO密集型
  • 混合型

CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。

由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu

可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。

线程池的监控

可以根据api提供的方法来获取线程池的一些数据来对线程池进行监控;
taskCount:线程池需要执行的任务数量。

  • completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
  • largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
  • getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
  • getActiveCount:获取活动的线程数。

推荐阅读