首页 > 技术文章 > java基础-多线程二

hetangyuese 2019-09-05 17:26 原文

java基础-多线程二

继承thread和实现Runnable的多线程每次都需要经历创建和销毁的过程,频繁的创建和销毁大大影响效率,线程池的诞生就可以很好的解决这一个问题,线程池可以充分的利用线程进行任务调度,重复利用

1. 线程池Executor与Executors

Executor为concurrent包下的线程顶级接口,提供了一个execute方法,参数为Runnable的实例

void execute(Runnable command);

Executors负责创建不同功能的线程池,线程工厂的角色,都是返回ExecutorService

2. 线程池的日常使用

2.1 newFixedThreadPool:创建指定大小的线程池,当有新任务提交时,如果线程池存在空闲则立即执行,反之则会加入阻塞队列中进行等待,等待唤醒并执行;适用于多任务异步执行

// 创建一个10个活跃线程池
ExecutorService service = Executors.newFixedThreadPool(10);

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

2.2 newSingleThreadExecutor: 只有一个线程的线程池,在发生异常的时候会创建一个新的线程池继续执行, 返回FinalizableDelegatedExecutorService实例;核心线程数为1,最大线程数也为1,空闲线程等待时间为0,LinkedBlockingQueue无界等待队列(链表结构) 适用于单个任务执行

ExecutorService service = Executors.newSingleThreadExecutor();

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
public class TestSingleThreadExecutor {

    static class Test implements Runnable {
        private Integer num;
        private String threadType;

        Test(Integer param, String threadType) {
            this.num = param;
            this.threadType = threadType;
        }

        @Override
        public void run() {
            System.out.println(threadType + "num=" + num + "==>" + Thread.currentThread().getName() + "线程计算=" + num / (num - 2));
        }
    }

    public static void main(String args[]) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        ExecutorService service1 = Executors.newFixedThreadPool(4);
        for (int i = 10; i >= 1; i--) {
            service.execute(new Test(i, "newSingleThreadExecutor"));
//            service1.execute(new Test(i, "newFixedThreadPool"));
        }
    }
}

// 执行结果:
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
	at com.hervey.thread.TestSingleThreadExecutor$Test.run(TestSingleThreadExecutor.java:28)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
newSingleThreadExecutornum=10==>pool-1-thread-1线程计算=1
newSingleThreadExecutornum=9==>pool-1-thread-1线程计算=1
newSingleThreadExecutornum=8==>pool-1-thread-1线程计算=1
newSingleThreadExecutornum=7==>pool-1-thread-1线程计算=1
newSingleThreadExecutornum=6==>pool-1-thread-1线程计算=1
newSingleThreadExecutornum=5==>pool-1-thread-1线程计算=1
newSingleThreadExecutornum=4==>pool-1-thread-1线程计算=2
newSingleThreadExecutornum=3==>pool-1-thread-1线程计算=3
newSingleThreadExecutornum=1==>pool-1-thread-2线程计算=-1

2.3 newCachedThreadPool:  当线程池中无可用线程时,创建和任务等同数量的线程,后续有新任务进来时,如果线程池中之前执行的线程还未被回收,则不会创建新线程直接复用之前的线程(新任务的数量<=线程池中已完成单位被回收的线程),这就是cache线程池;适用于耗时较短的任务

// SynchronousQueue同步队列用来存储等待的队列
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
public class TestCacheThreadPool {
    static class Test implements Runnable {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "正在执行");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i< 5; i++) {
            service.execute(new Test());
        }
        Thread.sleep(2000);
        System.out.println("沉睡2秒");
        for (int i = 0; i < 3; i++) {
            service.execute(new Test());
        }
    }
}
// 执行结果:
pool-1-thread-3正在执行
pool-1-thread-2正在执行
pool-1-thread-1正在执行
pool-1-thread-5正在执行
pool-1-thread-4正在执行
沉睡2秒
pool-1-thread-4正在执行
pool-1-thread-5正在执行
pool-1-thread-1正在执行
// 如果线程池中的线程可用数量小于新任务的数量,则会一直复用线程池中可用的线程,没有可用的时候则会创建新的线程
public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i< 5; i++) {
            service.execute(new Test());
        }
        Thread.sleep(2000);
        System.out.println("沉睡2秒");
        for (int i = 0; i < 10; i++) {
            service.execute(new Test());
        }
    }
// 执行结果:
pool-1-thread-1正在执行
pool-1-thread-2正在执行
pool-1-thread-3正在执行
pool-1-thread-2正在执行
pool-1-thread-4正在执行
沉睡2秒
pool-1-thread-1正在执行
pool-1-thread-2正在执行
pool-1-thread-3正在执行
pool-1-thread-4正在执行
pool-1-thread-2正在执行
pool-1-thread-1正在执行
pool-1-thread-3正在执行
pool-1-thread-4正在执行
pool-1-thread-5正在执行
pool-1-thread-6正在执行

2.4 newScheduledThreadPool: 可以设置计划时间执行的线程池,类似于quartz-scheduler定时任务,可以设置延时执行,每隔多久执行一次

// 使用newScheduledThreadPool
ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
// 源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
//DelayedWorkQueue 延迟队列
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

ScheduledExecutorService定时方法

// 延迟执行 Runnable实例,delay延时时长,TimeUnit时间单位
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

// Callable实例
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

// Runnable实例,initialDelay:初始化延时时间, period:每隔多长时间执行,TimeUnit:时间单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
                                                  long period, TimeUnit unit);

// Runnable实例,initialDelay:首次延迟时间, delay:后续延迟时间执行,TimeUnit:时间单位
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
                                                     long delay, TimeUnit unit);

上述都是日常比较常见的各种线程池的用法及作用,我们需要根据我们的使用场景来灵活调用,我们可以查看下源代码,线程池是怎么创建的及各参数的具体意义

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

线程池的各参数的概念及作用

  • corePoolSize 初始化线程池的线程数量,只有在工作队列超过这个数值时才会创建新的线程(不超过最大线程数量)

  • maximumPoolSize 线程池的最大数量

  • keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以通过设置allowCoreThreadTimeOut(true)使得核心线程有效时间到达是关闭线程

  • TimeUnit keepAliveTime 设置的时间单位

  • BlockingQueue 阻塞等待队列

  • ThreadFactory 线程工厂,用于创建新线程

  • RejectedExecutionHandler 当workQueue阻塞队列和maximumPoolSize都满了时执行拒绝策略,拒绝任务执行

    1. CallerRunsPolicy :这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功。

    2. AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。

    3. DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。

    4. DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。

3. 线程池流程图

推荐阅读