首页 > 技术文章 > JUC学习笔记(二):集合类、Callable和读写锁

renzhongpei 2020-05-23 21:34 原文

集合类不安全问题

List

  • ArrayList在并发下是不安全的,就会引发java.util.ConcurrentModificationException 并发修改异常。

  • 解决方案:

  1. 使用Vector,Vector是线程安全的 List<Object> list = new Vector<>();

    • 解决方案是在add方法添加synchronized

    • 但是Vector是JDK1.0就出来了,ArrayList是1.2才出来的,意味着Vector其实是一个过时的用法

  2. 使用Collections.synchronizedList,让ArrayList变得安全 Collections.synchronizedList(new ArrayList<>());

    • Collections.synchronizedList看源码发现其实是单例模式(final),然后所有的操作方法都是对这个对象加了synchronized

  3. 使用new CopyOnWriteArrayList<>();

    • 1.5出现

    • CopyOnWrite 写入时赋值,COW 是计算机程序设计领域的一种优化策略

    • 多线程调用的时候,写入时会赋值一份,写好了再插入进去

    • CopyOnWriteArrayList比Vector优秀在:

      • Vector使用的是Synchronized

      • CopyOnWrite使用的是Lock,效率更高

package com.rzp.unsafe;
​
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
​
​
public class ListTest {
    public static void main(String[] args) {
//        List<String> strings = Arrays.asList("1", "2", "3");
//        strings.forEach(System.out::print);
//执行会报java.util.ConcurrentModificationException 并发修改异常!
        //ArrayList在并发下是不安全的,就会引发这个异常
//        ArrayList<Object> list = new ArrayList<>();
        /**
         * 解决方案:
         * 1.这个时候可以使用Vector,Vector是线程安全的    List<Object> list = new Vector<>();
         *   但是Vector是JDK1.0就出来了,ArrayList是1.2才出来的,解决方案是在add方法添加synchronized
         * 2.使用Collections.synchronizedList,让ArrayList变得安全 Collections.synchronizedList(new ArrayList<>());
         *      源码单例模式(final),然后所有的操作方法都是对这个对象加了synchronized
         * 3.使用new CopyOnWriteArrayList<>();
         *      CopyOnWrite 写入时赋值,COW 是计算机程序设计领域的一种优化策略
         *
         */
        List<Object> list = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,5));
                System.out.println(list);
            },String.valueOf(i)).start();
        }
​
    }
}

 

Set

  • 问题和解决方案都和List一样

  • 底层的add走的是CopyOnWriteList的addIfAbsent的方法。

package com.rzp.unsafe;
​
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
​
public class SetTest {
​
    public static void main(String[] args) {
        //普通的Set也是有相同的ConcurrentModificationException
//        Set<Object> set = new HashSet<>();
        /**
         * 解决方案:
         * 1.Collections.synchronizedSet(new HashSet<>());
         * 2.new CopyOnWriteArraySet<>();
         */
        Set<Object> set = new CopyOnWriteArraySet<>();
​
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                set.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println(set);
            }, String.valueOf(i)).start();
        }
    }
}

 


Map

package com.rzp.unsafe;
​
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
​
public class MapTest {
    public static void main(String[] args) {
        /**
         *  HashMap实际工作中不会用无参构造,HashMap实际有3个构造方法
         *  1.public HashMap(int initialCapacity)
         *  2.public HashMap(int initialCapacity, float loadFactor)
         *  参数:
         *   initialCapacity the initial capacity  初始化容量
         *   loadFactor      the load factor  加载因子
         *  无参构造等价于: new HashMap<>(16,0.75);
         */
​
​
        /**
         * new HashMap<>(16,0.75f); 同样也会有ConcurrentModificationException
         * 解决方案:
         * 1.Collections.synchronizedMap(new HashMap<>());
         * 2.new ConcurrentHashMap<>();
         *   实现原理就是对结点增加synchronized
         */
        Map<String, String> map = new ConcurrentHashMap<>();
        for (int i = 0; i < 30; i++) {
            new Thread(()->{
                map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
                System.out.println(map);
            },String.valueOf(i)).start();
​
        }
​
​
    }
}

 

拓展

HashSet底层就是HashMap,使用了HashMap的Key。

Callable

  • 类似于Runnable,也是为了执行另一个线程设计的。

  • 不同点在于Callable可以有返回值,可以抛出异常,并且重写的不是run方法而是call方法。

 

package com.rzp.callablet;
​
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
​
public class Demo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //Thread只能启动Runnable,因此要把Callable转换成Runnable
        // Runnable有个实现类FutureTask,而FutureTask的构造方法可以传入Callable
        //new Thread(new Runnable()).start();
        //new Thread(new FutureTask<V>()).start();
        //new Thread(new FutureTask<V>(Callable)).start();
​
        MyTrhead myTrhead = new MyTrhead();
        //适配类
        FutureTask futureTask = new FutureTask(myTrhead);
        new Thread(futureTask,"A").start();
        //虽然调用两个线程,但是是有缓存,实际只执行了一次。
        new Thread(futureTask,"B").start();
        //获取返回值
        String result = (String) futureTask.get();
        System.out.println(result);
​
    }
}
​
/**
 * Callable<String>泛型的参数就是返回值的类型
 */
class MyTrhead implements Callable<String>{
​
    @Override
    public String call() throws Exception {
        System.out.println("running call");
        return "1024";
    }
}
​

 

常用的辅助类

1.CountDownLatch

  • 就是一个倒数计数器,可以让线程在倒数完成以前先暂停执行,倒数完成后继续执行。

package com.rzp.callablet;
​
import java.util.concurrent.CountDownLatch;
​
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        //创建倒数器,总数是6
        CountDownLatch count = new CountDownLatch(6);
​
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName());
                count.countDown();//数量-1
            }, String.valueOf(i)).start();
​
        }
        count.await(); // 等待结束,如果没有这行代码,下面的代码会在线程开启后直接执行,开启了以后就会在计数器归零以后才会往下执行
        System.out.println("Close Door");
​
    }
}
​

 

2.CyclicBarrier

  • 加法计数器。

package com.rzp.callablet;
​
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
​
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        //7颗龙珠召唤神龙
        CyclicBarrier cycli = new CyclicBarrier(7, () -> {
            System.out.println("召唤神龙");
        });
        for (int i = 0; i < 7; i++) {
            //内部类无法获取外部变量,要通过定义常量获得
            final int temp = i;
            new Thread(() -> {
                System.out.println(temp+1);
                try {
                    cycli.await();//让线程等待,直到达到计数器执行完计数器的内容后再唤醒
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(temp+"after wait");
            }).start();
        }
    }
}
​
/* output
1
2
4
3
5
6
7
召唤神龙
1after wait
4after wait
2after wait
6after wait
3after wait
0after wait
5after wait
*/

 

3.Semaphore

  • 可以定义N个许可证,拿到许可证的线程才能执行,其他会等待,知道释放许可证,再唤醒其他线程。

  • 可以限流

package com.rzp.callablet;
​
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
​
public class SemaphoreDemo {
    public static void main(String[] args) {
        //许可证数量
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 6; i++) {
            new Thread(()->{
                //获得许可证
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"获得许可证");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName()+"离开车位");
​
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    //释放许可证
                    semaphore.release();
                }
            }).start();
        }
    }
}
​

 

 

读写锁

  • ReadWriteLock

  • 提供一对关联的lock

    • 1个用于读,可以被多个线程获取。又被称为共享锁

    • 1个用于写,只能被1个线程写。又被称为独占锁。

  • 共享锁和不加锁的区别:加了的话,如果你正在读,即使其他线程逻辑到了写锁这里,必须等你读完了才开始写,保证了你读到的是写之前的,保证了一致性

  • 这样可以提高读的性能

public class ReadWriteLockDemo {
    public static void main(String[] args) {
//        MyCache myCache = new MyCache();
        MyCacheLock myCache = new MyCacheLock();
​
        for (int i = 1; i < 6; i++) {
            int temp = i;
            //写入
            new Thread(() -> {
                myCache.put(temp + "", temp);
            }, String.valueOf(temp)).start();
            //读取
            new Thread(() -> {
                myCache.get(temp + "");
            }, String.valueOf(temp)).start();
        }
​
​
​
    }
}




//自定义缓存
class MyCacheLock {
   private volatile Map map = new HashMap();

   //读写锁
   private ReadWriteLock lock = new ReentrantReadWriteLock();

   //存,写的时候只能有1个线程操作。
   public void put(String key, Object value) {
       //开启写锁
       lock.writeLock().lock();
       try {
           System.out.println(Thread.currentThread().getName() + "开始写入");
           map.put(key, value);
           System.out.println(Thread.currentThread().getName() + "写入完成");
      } catch (Exception e) {
           e.printStackTrace();
      } finally {
           //释放写锁
           lock.writeLock().unlock();
      }
  }

   /*输出:
1开始写入
1写入完成---写是一定要获得锁,写入没完成其他线程不会开始写
1开始读取
1读取完成
2开始写入
2写入完成
2开始读取
4开始读取----读取不需要获得锁
4读取完成
3开始读取
3读取完成
2读取完成
3开始写入
3写入完成
4开始写入
4写入完成
5开始写入
5写入完成
5开始读取
5读取完成
*/

 

 

 

推荐阅读