首页 > 技术文章 > 线程池

nicechen 2021-09-14 17:38 原文

第一章 线程池

1、线程池简介

为什么使用线程池:

  1、用少量的线程执行多个任务——避免内存占用过多;

  2、让线程池中的线程反复执行任务——避免生命周期的损耗。

线程池有好处:

  1)控制线程资源的总量;

  2)复用线程,节省资源,避免频繁销毁线程后又继续创建新的线程;

  3)加快响应速度;

  4)合理利用 CPU 和内存;

  5)统一管理。

如果不用线程池,每个任务都新创建一个线程处理。当任务数量上升到1000个,创建1000个线程去执行,开销太大。我们希望有固定数量的线程,来执行1000个任务,这样就避免了反复创建并销毁线程所带来的开销问题。

2、创建线程池

 

1、corePoolSize 和 maxPoolSize:

 

  1)corePoolSize 指的是核心线程数:线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有新任务到来时,再创建新线程去执行任务。

    • 在线程池中指定创建 5 个线程(核心线程数),没有任务时,这些线程不会被销毁,每当有新的线程任务,先访问线程池中有没有空闲的线程,如果有,就使用该线程,如果没有,就进入等待状态,直到有空闲的线程去执行它。

  2)线程池有可能会再核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上限,这就是最大量 maxPoolSize 。

 

2、增减线程

  定义:当前任务数量大于 线程池中定义的线程数+队列的长度 线程池就会新创建线程,新创建线程数量最大值为所定义的 maxPoolSize(最大线程数) 减去 corePoolSize (核心线程数) 的值。当任务数量大于 最大线程数+队列长度 ,就导致线程无法接受而采用拒绝策略。

增加线程流程图:

 

3、增减线程的特点

  1. 通过设置 corePoolSize(核心线程数)和 maximumPoolSize 相同,就可以创建固定大小的线程池;
  2. 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加他;
  3. 通过设置 maximumPoolSize 为很高的值,例如 Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务;
  4. 是只有在队列填满时才创建多于 corePoolSize 的线程,所以如果你使用的是无界队列(例如 LinkedBlockingQueue),那么线程数就不会超过 corePoolSize。(因为当队列满了以后才能创建新的线程)。

 

 4、keepAliveTime

如果线程池当前的线程数多于 corePoolSize,那么如果多余的线程空闲时间超过 keepAliveTime(保持存活时间),它们就会被终止。

 

5、ThreadFactory 

1)ThreadFactor 是线程工厂,是专门用来创建线程的;

2)新的线程是由 ThreadFactory 创建的,默认使用 Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的 NORM_PRIORITY 优先级并且都不是守护线程。如果自己指定 ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等等;

3)通常我们用默认的 ThreadFactory就可以了。

 

6、工作队列

有3种最常见的队列类型:

1)直接交接:SynchronousQueue (没有容量,存不下任务,此时 maxPoolSize 建议设置大一点,因为没有缓存任务的空间,很容易创建新的线程);

2)无界队列:LinkedBlockingQueue (无线容量,使用无限容量就没必要设置更大的 maxPoolSize,因为当线程池中的线程都在忙时,新来的任务就会放到队列中,而无界队列是放不满的。因此无界队列也有隐患,当线程池中的线程执行任务速度赶不上提交上来的任务速度,那么队列中的任务就越来越多,会导致浪费资源或 OOM 异常);

3)有界队列:ArrayBlockingQueue (可以设置队列的容量,当容量满了之后,根据设置的 maxPoolSize 的值来创建新的线程);

4)除此之外还有 DelayQueue、PriorityBlockingQueue 。

 

3、常见线程池的特点和用法

1、创建线程池的五种方式:

以下四种方式被称为:自动创建线程池

1)newFiedThreadPool

  • 使用的是 LinkedBlockingQueue(无界队列)由于传进去的 LinkedBlockingQueue(无界队列)是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,当内存满了时候,会导致 OOM 异常;
  • corePoolSize(核心线程数) 和 maxPoolSize(最大线程数) 的值一样,也就说设定多少个线程,他就只会有多少个线程,不会因为队列容量满了而创建新的线程。

newFiedThreadPool 源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
        //核心线程数和最大线程数都设置为 nThreads(创建线程池传入的参数),列队采用了 LinkedBlockingQueue(无界队列)
        return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
    }

 

OOM 异常演示:

public class FixedThreadPoolTest {
    public static void main(String[] args) {
        //newFixedThreadPool:无界队列
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executorService.execute(() -> {
                try {
                    //设置线程休眠非常久,让新任务存放于队列中,直到内存满了,报出 OOM 异常
                    Thread.sleep(100000000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
    }
}

 

 

2)newSingleThreadExecutor

  • 观看源码可以看出,这里和 newFixedThreadPool 的原理基本一样,也是使用 LinkedBlockingQueue(无界队列),只不过把corePoolSize(核心线程数) 和 maxPoolSize(最大线程数)直接设置成了 1,所以这也会导致造成占用大量的内存,当内存满了时候,会导致 OOM 异常。

newSingleThreadExecutor 源码:

public static ExecutorService newSingleThreadExecutor() {
        //核心线程数和最大线程数都设置为 1,列队采用了 LinkedBlockingQueue (无界队列)
        return new Executors.FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>()));
    }

newSingleThreadExecutor演示:

public class SingleThreadExecutor {
    public static void main(String[] args) {
        //executorService:无界队列,无需设置线程数量
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executorService.execute(() -> {
                try {
                    Thread.sleep(100);
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
    }
}

 

 

3)newCachedThreadPool

  • 可缓存的线程池;
  • 特点:无界线程池,具有自动回收多余线程的功能;
  • 弊端:源码的第二个参数 maximumPoolSIze 被设置为了 Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致 OOM 异常。

CachedThreadPool 源码:

public static ExecutorService newCachedThreadPool() {
        //核心线程数设置为 0,最大线程数为最大,列队采用了 SynchronousQueue(无容量)
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
    }

 

newCachedThreadPool 演示:

public class CachedThreadPool {
    public static void main(String[] args) {
        //newCachedThreadPool:无容量队列
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 1000; i++) {
            executorService.execute(() -> {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
    }
}

 

 

4)newScheduledThreadPool

  • 支持定时及周期性任务执行的线程池

newScheduledThreadPool  源码:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    //进入 ScheduledThreadPoolExecutor 查看源码
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                new ScheduledThreadPoolExecutor.DelayedWorkQueue());
    }
    //进入 super 查看源码
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), defaultHandler);
    }

通过查看源码,newScheduledThreadPool 的 maximumPoolSIze(最大线程数) 被设置为 Integer.MAX_VALUE,核心线程数自行设定。

 

newScheduledThreadPool 演示:

public class ScheduledThreadPoolTest {

    public static void main(String[] args) {
        //newScheduledThreadPool:支持定时及周期性任务执行的线程池
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
        
        //方式一
        //参数一:线程任务  参数二:设置多久后执行  参数三:时间单位
        executorService.schedule(() -> {
            System.out.println(Thread.currentThread().getName());
        },5, TimeUnit.SECONDS);

        
        
        //方式二
        //参数一:线程任务  参数二:多久后开始执行  参数三:每隔多久执行一次  参数四:时间单位
        executorService.scheduleAtFixedRate(() -> {
            System.out.println(Thread.currentThread().getName());
        },1,3,TimeUnit.SECONDS);
    }
}

 

 

5)在JDK1.8引入了新的线程池 ---- workStealingPool 

workStealingPool:

  • 这个线程池和之前的都有很大不同;
  • 它的特点就是能够”窃取“,当有线程执行完自己的任务后,会窃取其他线程产生的子任务来执行,因此,在加锁方面就没那么简单了,也因此,他不会按照顺序来执行队列中的任务;
  • 这中线程池使用的很少,他适合用在能产生子任务的场景,例如 递归、一个任务下还有任务,这些任务下还有其他任务,这种树形结构的。

 

2、线程池里的线程数量设定为多少比较合适?

  • CPU 密集型(加密、计算 hash 等):最佳线程数为 CPU 核心数的 1-2 倍左右。(例如 CPU 是8核的,可以创建8 - 16 个线程);
  • 耗时 IO 型(读写数据库、文件、网络读写等):最佳线程数一般会大于 cpu 核心数很多倍,以 JVM 线程监控显示繁忙情况为依据,保证线程空闲可以衔接上;
  • 线程数 = CPU 核心数 * (1+平均等待时间/平均工作时间)

 

3、以上四种线程池的构造函数的参数

线程池自动创建的线程(允许的情况下)如果没有任务执行空闲下来了,并超过 keepAliveTime (保持存活时间)后,将会被回收。

 

4、停止线程池的正确方法

1、shutdown:

  1)调用了 shutdown 方法,线程池不一定会停止,实际上这个方法是初始化整个关闭过程。当运行了 shutdown 方法,线程池不在接收新的任务,并且等待当前的线程运行完毕之后,再关闭;

  2)调用了 shutdown 方法之后,再新建任务提交给线程池,此时会抛出 RejectedExecutionException 异常,拒绝接收任务。

 

示例:

运行 1.5 秒后关闭线程池,此时线程池不再接收新任务,并把当前任务执行完毕后关闭。

/**
 * 使用shutdown关闭线程池
 */
public class ShutDown {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 50; i++) {
        executorService.execute(() -> {
            try {
                Thread.sleep(500);
                System.out.println(Thread.currentThread().getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        }
        try {
            Thread.sleep(1500);
            //关闭线程池
            executorService.shutdown();
            //关闭后添加新任务
            executorService.execute(() -> {
                System.out.println("错误");
            });
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

 

运行结果:

 

2、isShutdown

  1)isShutdown 判断线程池是否进入关闭状态。

  2)调用关闭方法前,isShutdown 返回 false,调用关闭方法后,isShutdown 返回 true(不管线程池是否完全关闭了)。

 

示例:

public class ShutDown {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 50; i++) {
        executorService.execute(() -> {
            try {
                Thread.sleep(500);
                System.out.println(Thread.currentThread().getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        }
        try {
            Thread.sleep(1500);
            //查看是否进去关闭状态
            System.out.println(executorService.isShutdown());
            //关闭线程池
            executorService.shutdown();
            //查看是否进入关闭状态
            System.out.println(executorService.isShutdown());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

运行结果:

 

3、isTerminated

  1)isTerminated 判断线程池是否完全关闭

  2)线程池完全关闭之前返回 false,线程池完全关闭之后返回 true,例如调用了 shutdown 方法,使线程池进去关闭状态,但是还未完全关闭,因为要等待线程把当前的任务执行完毕后才完全关闭

  

示例:

public class ShutDown {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
        executorService.execute(() -> {
            try {
                Thread.sleep(500);
                System.out.println(Thread.currentThread().getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        }
        try {
            Thread.sleep(500);
            //查看是否完全关闭
            System.out.println(executorService.isTerminated());
            //关闭线程池
            executorService.shutdown();
            Thread.sleep(2000);
            //查看是否完全关闭
            System.out.println(executorService.isTerminated());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

 

4、awaitTermination

  1)awaitTermination(timeout , TimeUnit) 等待一段时间后判断线程池是否完全关闭

  2)awaitTermination(timeout , TimeUnit)使阻塞式的。

示例:

public class ShuntDwon {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
        executorService.execute(() -> {
            try {
                Thread.sleep(800);
                System.out.println(Thread.currentThread().getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        }
        try {
            System.out.println(executorService.awaitTermination(4L, TimeUnit.SECONDS));
            System.out.println("被 awaitTermination 阻塞了");
            Thread.sleep(1000);
            System.out.println(executorService.awaitTermination(1L, TimeUnit.SECONDS));
            executorService.shutdown();
            Thread.sleep(1000);
            System.out.println(executorService.awaitTermination(1L, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

 

运行结果:

 

5、shutdownNow

  1)执行了 shutdownNow 会中断线程,线程池会立刻关闭;

  2)会抛出中断线程异常,shutdownNow 是有返回值的,返回的是在队列中的任务(这些任务未被执行),可以将这些返回出来的任务下次再执行。

示例:

public class ShutDown {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 20; i++) {
        executorService.execute(() -> {
            try {
                Thread.sleep(800);
                System.out.println(Thread.currentThread().getId());
            } catch (InterruptedException e) {
                //异常处理
                System.out.println(Thread.currentThread().getId() + ":被中断了");
            }
        });
        }
        try {
            Thread.sleep(1500);
            //关闭,并返回队列中的任务
            List<Runnable> runnables = executorService.shutdownNow();
       Thread.sleep(200);
//输出队列中的任务 int size = runnables.size(); for (int i = 0; i < size; i++) { System.out.println(runnables.get(i)); } } catch (InterruptedException e) { e.printStackTrace(); } } }

 

运行结果:

 

5、拒绝任务

1、拒绝时机

  1)当 Executor 关闭时,提交新任务会被拒绝;

  2)当 Executor 对最大线程和工作队列容量使用有限边界并且已经饱和时。

 

2、四种拒绝策略

  1)AbortPolicy

    • 直接抛出异常。

  2)DiscardPolicy

    • 线程池会把新来的任务丢弃,并且不会发出通知,不会处理。

  3)DiscardOldestPolicy

    • 当有新任务时,会把队列中存在最久的任务丢弃掉,从而让新任务加入到队列中。

  4)CallerRunsPolicy(推荐)

    • 让任务提交者自己执行这个任务,例如主线程提交任务给线程池。线程池会把该任务交给主线程去执行。

 

6、钩子方法,给线程池加点料

1、每个线程执行任务前后会执行相应的钩子函数;

2、可以用来打印日志、统计等

 

演示:

/**
 * 演示每个任务执行前后放钩子函数
 */
//实现线程池
public class PauseableThreadPool extends ThreadPoolExecutor {
    //创建锁,后面再详细介绍
    private final static ReentrantLock lock = new ReentrantLock();
    private Condition unpaused = lock.newCondition();
    //定义成员变量,作为标记位
    private boolean isPauserd;

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }


    @Override //在执行每个任务之前都会先执行这个函数
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        System.out.println(Thread.currentThread().getId()+":获取锁");
        try {
            //判断是否要暂停
            if (isPauserd) {
                unpaused.await();
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            System.out.println("释放了锁");
            lock.unlock();
        }
    }

    //定义暂停标记方法
    private void pause(){
        lock.lock();
        try{
            isPauserd = true;
        }finally {
            lock.unlock();
        }
    }
    //定义唤醒方法
    public void resume(){
        lock.lock();
        try{
            isPauserd = false;
            //唤醒全部
            unpaused.signalAll();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("我被执行");
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 100; i++) {
            pauseableThreadPool.execute(runnable);
        }
        try {
            Thread.sleep(1500);
            pauseableThreadPool.pause();
            System.out.println("线程池被暂停了");
            Thread.sleep(1500);
            System.out.println("线程池被唤醒");
            pauseableThreadPool.resume();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

7、实现原理、源码分析

1、线程池组成部分

  1)线程池管理器:例如关闭线程池;

  2)工作线程:被创建出来执行任务的线程;

  3)任务队列:存放任务的队列;

  4)任务接口(Task):线程执行的任务

 

2、Executor 家族

线程池、ThreadPoolExecutor、ExecutorService、Executor、Executors 等线程池相关的类,他们到底什么关系?

  1)Executor

    • 他是一个顶级结构,里面就只有一个方法( execute(Runnable) ),该方法是用来开启线程。

  部分源码截图:

  

  

  2)ExecutorService

    • 他继承了 Executor 接口,并且添加了一些新的方法。(例如有:关闭线程池、判断线程池是否被关闭等方法)

  部分源码截图:

  

  3)Executors

    • 他是一个工具类,用于快速帮我们创建线程池用的。(例如有:newFixedThreadPool 等方法)
    • Executors 中创建线程池的方法的返回值类型都是 ExecutorService ,而返回值是 ThreadPoolExecutor。

  部分源码截图:

  

 

  4)他们的关系

  

 

3、线程池状态

  1. RUNNING(Running):接受新任务并处理排队任务;
  2. SHUTDOWN(Shutdown):不接受新任务,但处理排队任务;
  3. STOP(Stop):不接受新任务,也不处理排队任务,并中断正在进行的任务;
  4. TIDYING(Tidying):中文是整洁,所有任务都已终止,workerCount 为零时,线程会转换到 TIDYING 状态,并将运行 terminate() 钩子方法;
  5. TERMINATED(Terminated):terminate() 运行完成。

 

8、使用线程池的注意点

 1、避免任务堆积;

2、避免线程数过度增加;

3、排查线程泄露。

推荐阅读