首页 > 技术文章 > Java并发包中常用类小结(二)

longshiyVip 2016-02-23 22:12 原文

6ThredPoolExecutor

ThredPoolExecutor是基于命令模式下的一个典型的线程池的实现,主要通过一些策略实现一个典型的线程池,目前已知的策略有ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy。废话不多说,我们来看一个示例:

package com.yhj.container.concurrent;
 
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * @Described:线程池测试
 * @author YHJ create at 2012-4-13 下午01:34:03
 * @FileNmae com.yhj.container.concurrent.ThreadPoolExecutorTestCase.java
 */
public class ThreadPoolExecutorTestCase {
   
    private AtomicInteger successTask = new AtomicInteger(0);//成功的任务数目
   
    private AtomicInteger failedTask = new AtomicInteger(0);//失败的任务数目
   
    private Integer thredCount;//启动的线程数
 
    private ThreadPoolExecutor executor;
   
    private CountDownLatch latch;//计数器
   
    private CyclicBarrier cyclicBarrier;//集合点
   
    //构造函数
    public ThreadPoolExecutorTestCase(BlockingQueue<Runnable> queue,Integer thredCount) {
       super();
       System.out.println("queue name:"+queue.getClass());
       this.thredCount=thredCount;
       executor = new ThreadPoolExecutor(10, 500, 30, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    }
   
    //要处理的任务列表
    class Task implements Runnable{
      
       private CountDownLatch latch;//计数器
      
       private CyclicBarrier cyclicBarrier;//集合点
      
       public Task(CountDownLatch latch, CyclicBarrier cyclicBarrier) {
           super();
           this.latch = latch;
           this.cyclicBarrier = cyclicBarrier;
       }
 
       @Override
       public void run() {
           try {
              cyclicBarrier.await();//到达预期集合点再执行
           } catch (Exception e) {
              e.printStackTrace();
           }
           try {
              executor.execute(new Runnable() {
                 
                  @Override
                  public void run() {
                     try {
                         Thread.sleep(3000);//休眠3秒
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
                     latch.countDown();
                     successTask.incrementAndGet();
                  }
              });
           } catch (RejectedExecutionException e) {
              latch.countDown();
              failedTask.incrementAndGet();
           }
          
       }
      
    }
   
    //初始化
    public void init(){
       latch = new CountDownLatch(thredCount);
       cyclicBarrier = new CyclicBarrier(thredCount);
    }
   
    //启动方法
    public void start(){
       long startTime = System.currentTimeMillis();
       for(int i=0;i<thredCount;++i)
           new Thread(new Task(latch, cyclicBarrier)).start();
       try {
           latch.await();
           executor.shutdownNow();
           System.out.println("total time:"+(System.currentTimeMillis()-startTime));
           System.out.println("success count:"+successTask.intValue());
           System.out.println("failed count:"+failedTask.intValue());
           System.out.println("===end===");
        } catch (Exception e) {
           e.printStackTrace();
       }
    }
   
    //强制关闭方法
    public void shutDonw(){
       executor.shutdownNow();
    }
   
    //主函数
    public static void main(String[] args) {
       //性能优先 速度优先
       ThreadPoolExecutorTestCase testCase = new ThreadPoolExecutorTestCase(new SynchronousQueue<Runnable>(), 1000);
       testCase.init();
       testCase.start();
       //稳定优先  使用数组缓存队列
       testCase=new ThreadPoolExecutorTestCase(new ArrayBlockingQueue<Runnable>(10), 1000);
       testCase.init();
       testCase.start();
       //稳定优先  使用链表缓存队列
       testCase=new ThreadPoolExecutorTestCase(new LinkedBlockingDeque<Runnable>(10), 1000);
       testCase.init();
       testCase.start();
       //关掉处理器
       //testCase.shutDonw();
    }
 
}

运行结果如下:

我们可以看到,通过缓冲可以提升成功率,但是明显消耗的时间大大增加了。

7Executors

对于线程池,其实也是我们经常用的一个东西,在多线程环境下,线程池是控制并发操作的一个很好的解决方案,但是每次都通过ThredPoolExecutor未免有点麻烦,因此JDK为我们提供可Executors以便我们能轻松的创建ThredPoolExecutor的实例,我们来看以下有哪些快速创建实例的方法:

以上的方法估计也是大家经常用到的,具体的实例我在此也就不多写了。

8FutureTask

FutureTask可以用户异步获取数据的一种方法,我们前面提到使用ConcurrentHashMap代替HashMap来提升map的性能,但是我们知道,ConcurrentHashMap在进行读操作的时候基本是不加锁的,假设我们有这么一个需求,我们有一个数据库的连接池,默认是不初始化的,在第一次用户用到的时候进行初始化操作。那我们该如何实现的呢?

package com.yhj.container.concurrent;
 
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.ReentrantLock;
 
/**
 * @Described:异步通知测试用例
 * @author YHJ create at 2012-4-14 上午11:31:26
 * @FileNmae com.yhj.container.concurrent.FutureTaskTestCase.java
 */
public class FutureTaskTestCase {
 
    //测试需求: 使用一个key-value 形式存储
    //测试要求: 所有的连接池对象只能创建一次 且不能加载时初始化 在第一次访问时初始化目标连接池对象
    //测试实现: 通过HashMap加锁实现和FutureTask实现
 
    //Map测试任务 用例
    interface MapTask{
       //根据指定的key 获取指定的DB连接  key etc:mysql sqlserver oracle DB2 and so on
       public Connection getConnection(String key);
    }
 
    //枚举 数据库类型
    enum DB_TYPE{
       MYSQL(),SQLSERVR,ORACLE,DB2;
    }
 
    //使用HashMap加锁实现
    class HashMapWithLock implements MapTask{
 
       private Map<String, DBConnectionPool> pools = new HashMap<String, DBConnectionPool>();
 
       private ReentrantLock lock = new ReentrantLock();
 
       //加锁获取连接对象,防止高并发下数据重复创建
       @Override
       public Connection getConnection(String key){
           try {
              lock.lock(); //锁定操作,后续再来等待
              if(!pools.containsKey(key))
                  pools.put(key, new DBConnectionPool(key));
              return pools.get(key).getConnection();
           } finally{
              lock.unlock();//解锁操作
           }
       }
    }
 
    //使用ConcurrentHashMap实现,因为ConcurrentHashMap读取时不加锁,因此需要通过回调的方式控制并发
    class ConcurrentHashMapWithFutureTask implements MapTask{
 
       private ConcurrentHashMap<String, FutureTask<DBConnectionPool>> pools = new ConcurrentHashMap<String, FutureTask<DBConnectionPool>>();
 
       private FutureTask<DBConnectionPool> futureTask;
 
       //通过回调的方式 确保多线程下不会引发多次创建
       @Override
       public Connection getConnection(final String key){
           try {
              if(!pools.containsKey(key)){
                  Callable<DBConnectionPool> callable = new Callable<DBConnectionPool>() {
 
                     @Override
                     public DBConnectionPool call() throws Exception {
                         pools.put(key,futureTask);
                         return new DBConnectionPool(key);
                     }
                  };
                  FutureTask<DBConnectionPool> tmpTask = new FutureTask<DBConnectionPool>(callable);
                  futureTask = pools.putIfAbsent(key, tmpTask);
                  if(futureTask==null){
                     futureTask = tmpTask;
                     futureTask.run();
                  }
              }
              return pools.get(key).get().getConnection();
           } catch (Exception e) {
              e.printStackTrace();
              return null;
           }
       }
    }
 
    //DB连接池  测试用例供体
    class DBConnectionPool{
 
       public DBConnectionPool(String key) {
           System.out.println("创建了"+key+"类型的数据库连接池");
       }
 
       //获取DB连接
       public Connection getConnection(){
           // create Connection for db
           return new Connection();
       }
    }
 
    //DB连接 测试供体
    class Connection{
    }
 
    //任务执行器 待执行的任务
    class ExecutorTask implements Runnable{
 
       private CyclicBarrier barrier;//计数器
 
       private CountDownLatch latch;//集合点
 
       private MapTask task;//待执行的任务
 
       private String key;
 
       public ExecutorTask(String key,CyclicBarrier barrier, CountDownLatch latch,MapTask task) {
           this.barrier = barrier;
           this.latch = latch;
           this.task = task;
           this.key=key;
       }
 
       @Override
       public void run() {
           try {
              barrier.await();//到达集合点之前等待 确保数据是并发执行的
              Connection connection = task.getConnection(key);
              if(null==connection)
                  throw new NullPointerException("Null Connection Exception with "+key);
              latch.countDown();
           } catch (Exception e) {
              e.printStackTrace();
           }
       }
 
    }
 
    //执行函数
    public void execute(String key,int thredCount,MapTask task){
       CyclicBarrier barrier = new CyclicBarrier(thredCount);
       CountDownLatch latch = new CountDownLatch(thredCount);
       long beginTime = System.currentTimeMillis();
       System.out.println("===start "+task.getClass()+"===");
       for(int i=0;i<thredCount;++i){
           new Thread(new ExecutorTask(key, barrier, latch, task)).start();
       }
       try {
           latch.await();
           System.out.println("====end "+task.getClass()+" still time "+(System.currentTimeMillis()-beginTime)+"===");
       } catch (InterruptedException e1) {
           throw new RuntimeException(e1);
       }
    }
 
    //启动函数
    public void start(){
       int thredCount = 200;
       MapTask hashMapWithLock = new HashMapWithLock();
       MapTask concurrentHashMapWithFutureTask = new ConcurrentHashMapWithFutureTask();
       execute("mysql",thredCount, hashMapWithLock);
       execute("sqlserver",thredCount, concurrentHashMapWithFutureTask);
    }
 
    //主函数
    public static void main(String[] args) {
       //启动主进程
       new FutureTaskTestCase().start();
       //等待所有进程结束
       while(Thread.activeCount()>1){
           Thread.yield();
       }
    }
 
}

执行结果如下:

我们看到对象值创建了一次,但是通过回调的方式速度会慢一点,毕竟是异步的,有一部分线程需要等待,但是在多线程的模式下,显然我们可以规避单点访问堆积过大的问题。

推荐阅读