6、ThredPoolExecutor
ThredPoolExecutor是基于命令模式下的一个典型的线程池的实现,主要通过一些策略实现一个典型的线程池,目前已知的策略有ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy。废话不多说,我们来看一个示例:
package com.yhj.container.concurrent; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @Described:线程池测试 * @author YHJ create at 2012-4-13 下午01:34:03 * @FileNmae com.yhj.container.concurrent.ThreadPoolExecutorTestCase.java */ public class ThreadPoolExecutorTestCase { private AtomicInteger successTask = new AtomicInteger(0);//成功的任务数目 private AtomicInteger failedTask = new AtomicInteger(0);//失败的任务数目 private Integer thredCount;//启动的线程数 private ThreadPoolExecutor executor; private CountDownLatch latch;//计数器 private CyclicBarrier cyclicBarrier;//集合点 //构造函数 public ThreadPoolExecutorTestCase(BlockingQueue<Runnable> queue,Integer thredCount) { super(); System.out.println("queue name:"+queue.getClass()); this.thredCount=thredCount; executor = new ThreadPoolExecutor(10, 500, 30, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); } //要处理的任务列表 class Task implements Runnable{ private CountDownLatch latch;//计数器 private CyclicBarrier cyclicBarrier;//集合点 public Task(CountDownLatch latch, CyclicBarrier cyclicBarrier) { super(); this.latch = latch; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { cyclicBarrier.await();//到达预期集合点再执行 } catch (Exception e) { e.printStackTrace(); } try { executor.execute(new Runnable() { @Override public void run() { try { Thread.sleep(3000);//休眠3秒 } catch (Exception e) { e.printStackTrace(); } latch.countDown(); successTask.incrementAndGet(); } }); } catch (RejectedExecutionException e) { latch.countDown(); failedTask.incrementAndGet(); } } } //初始化 public void init(){ latch = new CountDownLatch(thredCount); cyclicBarrier = new CyclicBarrier(thredCount); } //启动方法 public void start(){ long startTime = System.currentTimeMillis(); for(int i=0;i<thredCount;++i) new Thread(new Task(latch, cyclicBarrier)).start(); try { latch.await(); executor.shutdownNow(); System.out.println("total time:"+(System.currentTimeMillis()-startTime)); System.out.println("success count:"+successTask.intValue()); System.out.println("failed count:"+failedTask.intValue()); System.out.println("===end==="); } catch (Exception e) { e.printStackTrace(); } } //强制关闭方法 public void shutDonw(){ executor.shutdownNow(); } //主函数 public static void main(String[] args) { //性能优先 速度优先 ThreadPoolExecutorTestCase testCase = new ThreadPoolExecutorTestCase(new SynchronousQueue<Runnable>(), 1000); testCase.init(); testCase.start(); //稳定优先 使用数组缓存队列 testCase=new ThreadPoolExecutorTestCase(new ArrayBlockingQueue<Runnable>(10), 1000); testCase.init(); testCase.start(); //稳定优先 使用链表缓存队列 testCase=new ThreadPoolExecutorTestCase(new LinkedBlockingDeque<Runnable>(10), 1000); testCase.init(); testCase.start(); //关掉处理器 //testCase.shutDonw(); } }
运行结果如下:
我们可以看到,通过缓冲可以提升成功率,但是明显消耗的时间大大增加了。
7、Executors
对于线程池,其实也是我们经常用的一个东西,在多线程环境下,线程池是控制并发操作的一个很好的解决方案,但是每次都通过ThredPoolExecutor未免有点麻烦,因此JDK为我们提供可Executors以便我们能轻松的创建ThredPoolExecutor的实例,我们来看以下有哪些快速创建实例的方法:
以上的方法估计也是大家经常用到的,具体的实例我在此也就不多写了。
8、FutureTask
FutureTask可以用户异步获取数据的一种方法,我们前面提到使用ConcurrentHashMap代替HashMap来提升map的性能,但是我们知道,ConcurrentHashMap在进行读操作的时候基本是不加锁的,假设我们有这么一个需求,我们有一个数据库的连接池,默认是不初始化的,在第一次用户用到的时候进行初始化操作。那我们该如何实现的呢?
package com.yhj.container.concurrent; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.FutureTask; import java.util.concurrent.locks.ReentrantLock; /** * @Described:异步通知测试用例 * @author YHJ create at 2012-4-14 上午11:31:26 * @FileNmae com.yhj.container.concurrent.FutureTaskTestCase.java */ public class FutureTaskTestCase { //测试需求: 使用一个key-value 形式存储 //测试要求: 所有的连接池对象只能创建一次 且不能加载时初始化 在第一次访问时初始化目标连接池对象 //测试实现: 通过HashMap加锁实现和FutureTask实现 //Map测试任务 用例 interface MapTask{ //根据指定的key 获取指定的DB连接 key etc:mysql sqlserver oracle DB2 and so on public Connection getConnection(String key); } //枚举 数据库类型 enum DB_TYPE{ MYSQL(),SQLSERVR,ORACLE,DB2; } //使用HashMap加锁实现 class HashMapWithLock implements MapTask{ private Map<String, DBConnectionPool> pools = new HashMap<String, DBConnectionPool>(); private ReentrantLock lock = new ReentrantLock(); //加锁获取连接对象,防止高并发下数据重复创建 @Override public Connection getConnection(String key){ try { lock.lock(); //锁定操作,后续再来等待 if(!pools.containsKey(key)) pools.put(key, new DBConnectionPool(key)); return pools.get(key).getConnection(); } finally{ lock.unlock();//解锁操作 } } } //使用ConcurrentHashMap实现,因为ConcurrentHashMap读取时不加锁,因此需要通过回调的方式控制并发 class ConcurrentHashMapWithFutureTask implements MapTask{ private ConcurrentHashMap<String, FutureTask<DBConnectionPool>> pools = new ConcurrentHashMap<String, FutureTask<DBConnectionPool>>(); private FutureTask<DBConnectionPool> futureTask; //通过回调的方式 确保多线程下不会引发多次创建 @Override public Connection getConnection(final String key){ try { if(!pools.containsKey(key)){ Callable<DBConnectionPool> callable = new Callable<DBConnectionPool>() { @Override public DBConnectionPool call() throws Exception { pools.put(key,futureTask); return new DBConnectionPool(key); } }; FutureTask<DBConnectionPool> tmpTask = new FutureTask<DBConnectionPool>(callable); futureTask = pools.putIfAbsent(key, tmpTask); if(futureTask==null){ futureTask = tmpTask; futureTask.run(); } } return pools.get(key).get().getConnection(); } catch (Exception e) { e.printStackTrace(); return null; } } } //DB连接池 测试用例供体 class DBConnectionPool{ public DBConnectionPool(String key) { System.out.println("创建了"+key+"类型的数据库连接池"); } //获取DB连接 public Connection getConnection(){ // create Connection for db return new Connection(); } } //DB连接 测试供体 class Connection{ } //任务执行器 待执行的任务 class ExecutorTask implements Runnable{ private CyclicBarrier barrier;//计数器 private CountDownLatch latch;//集合点 private MapTask task;//待执行的任务 private String key; public ExecutorTask(String key,CyclicBarrier barrier, CountDownLatch latch,MapTask task) { this.barrier = barrier; this.latch = latch; this.task = task; this.key=key; } @Override public void run() { try { barrier.await();//到达集合点之前等待 确保数据是并发执行的 Connection connection = task.getConnection(key); if(null==connection) throw new NullPointerException("Null Connection Exception with "+key); latch.countDown(); } catch (Exception e) { e.printStackTrace(); } } } //执行函数 public void execute(String key,int thredCount,MapTask task){ CyclicBarrier barrier = new CyclicBarrier(thredCount); CountDownLatch latch = new CountDownLatch(thredCount); long beginTime = System.currentTimeMillis(); System.out.println("===start "+task.getClass()+"==="); for(int i=0;i<thredCount;++i){ new Thread(new ExecutorTask(key, barrier, latch, task)).start(); } try { latch.await(); System.out.println("====end "+task.getClass()+" still time "+(System.currentTimeMillis()-beginTime)+"==="); } catch (InterruptedException e1) { throw new RuntimeException(e1); } } //启动函数 public void start(){ int thredCount = 200; MapTask hashMapWithLock = new HashMapWithLock(); MapTask concurrentHashMapWithFutureTask = new ConcurrentHashMapWithFutureTask(); execute("mysql",thredCount, hashMapWithLock); execute("sqlserver",thredCount, concurrentHashMapWithFutureTask); } //主函数 public static void main(String[] args) { //启动主进程 new FutureTaskTestCase().start(); //等待所有进程结束 while(Thread.activeCount()>1){ Thread.yield(); } } }
执行结果如下:
我们看到对象值创建了一次,但是通过回调的方式速度会慢一点,毕竟是异步的,有一部分线程需要等待,但是在多线程的模式下,显然我们可以规避单点访问堆积过大的问题。