首页 > 技术文章 > 并发容器

nicechen 2021-09-18 21:05 原文

第六章 并发容器

1、ConcurrentHashMap 和 CopyOnWriteArrayList 

1、ConcurrentHashMap 和 CopyOnWriteArrayList 简介

  1)取代同步的 HashMap 和同步的 ArrayList;

  2)绝大多数并发情况下,ConcurrentHashMap 和 CopyOnWriteArrayList 的性能都更好。

 

2、为什么 HashMap 是线程不安全的?

  1)同时 put 碰撞导致数据丢失;

    1. 如果有两个线程同时 put,且计算出来的哈希值一样的话,那么这两个 key 会放到同一个位置,就导致其中一个 key 会被丢弃,从而丢失数据。

  2)同时 put 扩容导致数据丢失;

    1. 如果多个线程同时 put 扩容,那么扩容之后也只会保存一个数据,导致其他线程的数据丢失。 

  3)死循环造成的 CPU 100%。

    1. HashMap 在高并发下的死循环(仅在 JDK7 及以前存在)

 

4、HashMap 关于并发的特点

  1)非线程安全;

  2)迭代时不允许修改内容;

  3)只读的并发是安全的;

  4)如果一定要把 HashMap 用在并发环境,用 Collections.synchronizedMap(new HashMap())。

 

5、ConcurrentHashMap(1.8)

  1)putVal 流程

    1. 判断 key value 不为空;
    2. 计算 hash 值;
    3. 根据对应位置节点的类型,来赋值,或者 helpTransfer,或者增长链表,或者给红黑树增加节点;
    4. 检查满足阈值就转化成红黑树;
    5. 返回oldVal。

  2)get 流程

    1. 计算 hash 值;
    2. 找到对应的位置;
    3. 如果直接找到,则返回;
    4. 如果是个红黑树,则在红黑树找;
    5. 如果是链表,则在链表找;
    6. 最后返回找到的结果。

 

  3)为什么超过阈值8,就要转换成红黑树?

    1. 一开始默认为是一个链表。红黑树的每一个节点占用的空间是链表的两倍,在空间是是比链表的损耗大;
    2. 想要达到冲突为8,正常情况下很难,只有0.00000006的概率,如果真的发生这种情况,转换成红黑树,可以确保在这种极端条件下的查询,依然保证一定的效率。

 

  4)组合操作

    1、replace:代码演示:

public class VectorDemo implements Runnable{
    public static ConcurrentHashMap<String,Integer> concurrentHashMap = new ConcurrentHashMap<>(5);
    public static void main(String[] args) throws InterruptedException {
        concurrentHashMap.put("表哥",0);
        VectorDemo demo = new VectorDemo();
        Thread thread1 = new Thread(demo);
        Thread thread2 = new Thread(demo);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(concurrentHashMap.get("表哥"));
    }

    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            /*代码意思:
            因为在自增操作时非线程安全的,假设这两个线程同时读取到的 oldscord 的值都为0,然后为其加1,之后调用 replace,
            那么只有一个线程会替换成功,而下一个线程就让他重新循环一次
            */
            while (true){
                Integer oldscord = concurrentHashMap.get("表哥");
                //非线程安全
                Integer newScord = oldscord + 1;
                //线程安全,判断旧的值是否与新的值一样,一样则返回false,不一样则更新值
                boolean b = concurrentHashMap.replace("表哥", oldscord, newScord);
                if (b){
                    break;
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

    2、putIfAbsent(key,value):是否有该 key,如果没有,则它自动调用 put(key,value),如果有,则它自动调用 get(key),他的底层代码:

//如果不存在这个 key
if(!map.containsKey(key))
    //则添加这个 key 和对应的 value
    return map.put(key,value);
else
    //否则返回容器中这个 key 所对应的值
    return map.get(key);

 

 

6、CopyOnWriteArrayList 

  1)CopyOnWriteArrayList  诞生记

    1. 代替 Vector 和 SynchronizedList,就和 ConcurrentHashMap 代替 Vector 和 SynchronizedList 一样;
    2.  Vector 和 SynchronizedList 的锁粒度太大,并发效率相对比较低,并且迭代时无法编辑;
    3. Copy-On-Write 并发容器还包括 CopyOnWriteArraySet,用来代替同步 Set。

  2)CopyOnWriteArrayList   使用场景

    1. 读操作可以尽可能地快,而写即使慢一些也没有太大关系;
    2. 读多写少:例如监听器,迭代操作远多于修改操作。

  3)CopyOnWriteArrayList  读写规则

    1. 读写与读写锁一致的规则:读读共享、其他都互斥;
    2. 与读写锁不同的是,读取是完全不用加锁,写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。

简单示例:

/**
 * 演示 CopyOnWriteArrayList 可以在迭代的过程中修改数组内容,但是,迭代时不会把修改的数据迭代出来,而是打印出迭代前的数据 ArrayList 不行,会报错
 */
public class CopyOnWriteArrayListDemo {
    public static void main(String[] args) {
        //ArrayList<String> list = new ArrayList<>();
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Iterator<String> iterator = list.iterator();
        while(iterator.hasNext()){
            System.out.println("list is" + list);
            String next = iterator.next();
            System.out.println(next);

            if(next.equals("2")){
                list.remove("5");
            }
            if (next.equals("3")){
                list.add("3 found");
            }
        }
    }
}

 

 

7、CopyOnWriteArrayList 实现原理

  1)CopyOnWrite的含义

    1. 对计算机内存的数据进行修改时,先把这块内存的数据拷贝一份,把拷贝的东西放到新的内存中,然后在新的内存中把这个数据修改掉,指针在指向这个新的内存,旧的内存就被回收。

  2)创建新副本、读写分离

    1. 所有的修改操作,在底层是新建一个数组完成,对其修改时,把整个原有的数据进行复制,把修改的内容写入新的副本中,这是一个读写分离的思想,读和写使用完全不同的容器。

  3)“不可变”原理

    1. 在每一次修改的过程中,都是新建一个容器,对于旧的容器来说就是不可变的,因为旧的容器不会被操作,所以说旧的容器是不会变的,线程安全的,可以并发的读。

  4)迭代的时候

    1. 在迭代时,如果数组的原内容被修改过了,迭代器是不知道的,迭代器依然使用的是旧数组。

 

8、CopyOnWriteArrayList 的缺点

  1)数据一致性问题:CopyOnWrite 容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的数据,立马能读到,请不要使用 CopyOnWrite 容器;

  2)内存占用问题:因为 CopyOnWrite 的写是复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存。

 

2、并发队列 Queue:阻塞队列

 1、为什么要使用队列?

  1)用队列可以在线程间传递数据,不需要自己考虑线程安全问题。

 

2、并发队列简介

  1)Queue

    1. Queue 是用来保存一组等待处理的数据,Queue底层实现就是使用了 LinkedList 。

  2)BlockingQueue

    1. 扩展了 Queue,增加了可阻塞的插入和获取操作,如果队列为空,它获取的操作会一直阻塞,直到里面有了数据。如果队列满了,插入数据操作会一直阻塞,直到队列有空间可以让该数据进来。

 

3、什么是阻塞队列?

  1)阻塞队列是具有阻塞功能的队列,所以他首先是一个队列,其次是具有阻塞功能。

  2)通常,阻塞队列的一端是给生产者放数据用,另一端给消费者拿数据用。阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的。

  3)阻塞功能:最有特色的两个带有阻塞功能的方法是:

    1. take():获取并移除队列的头结点,一旦执行 take 的时候,如果队列里无数据,则阻塞,直到队列里有数据;
    2. put():插入元素。但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。

  4)是否有界(容量有多大),这是一个非常重要的属性,无界队列意味着里面可以容纳非常多(Integer.MAX_VALUE,可以近似认为是无限容量)。

  5)阻塞队列和线程池的关系:阻塞队列是线程池的重要组成部分。

 

4、BlockingQueue 主要方法

  1)put,take (以上有介绍,这里不在做介绍)

  2)add (与 put 相似往队列里放数据,但是队列满了,会报错)remove (如果队列是空的,会报错),element (返回队列中的头元素,如果是空的,会报错)

  3)offer(往队列里放数据,但是队列满了,返回 false),poll (在队里中取出数据并删除,如果为空,返回 null),peek(在队里中取出数据,如果为空,返回 null)

 

5、ArrayBlockingQueue

  1)他是一个有界的队列,创建时候需要制定一个容量,还可以指定是否保证公平,如果想保证公平的话,那么等待了最长时间的线程会被有限处理,不过这会同时带来一定的性能损耗。

  2)ArrayBlockingQueue基本使用示例:

public class ArrayBlockingQueueDemo {
    public static ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);

    public static void main(String[] args) {
        Interviewer r1 = new Interviewer(queue);
        Consumer r2 = new Consumer(queue);
        new Thread(r1).start();
        new Thread(r2).start();

    }
}

class Interviewer implements Runnable{
    BlockingQueue<String> queue;

    public Interviewer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("10个候选人都来啦");
        for (int i = 0; i < 10; i++) {
            String candidate = "candidate" + i;
            try {
                queue.put(candidate);
                System.out.println("安排好了" + candidate);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            queue.put("stop");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable{
    BlockingQueue<String> queue;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String msg;
        try {
            while(!(msg = queue.take()).equals("stop")){
                System.out.println(msg + "到了");
            }
            System.out.println("所有候选人都结束了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

 

6、LinkedBlockingQueue

  1)他是一个无界的(容量为 Integer.MAX_VALUE)。内部结构:Node、两把锁。

 

7、PriorityBlockingQueue

  1)支持优先级;

  2)自然排序(不是先进先出),排序规则可以自定义;

  3)无界队列

 

7、SynchronousQueue

  1)容量为 0,SynchronousQueue不需要去持有元素,它所做的就是直接传递,所以效率比较高。

  2)SynchronousQueue 没有 peek 等方法,因为 peek 的含义是取出头结点,但是 SynchronousQueue 的容量是 0,所以连头结点都没有,也就没有 peek 方法。同理,也没有 iterate 相关方法。

  3)是一个极好的用来直接传递的并发数据结构。

  3)SynchronousQueue 是线程池 Executors.newCachedThreadPool() 勇士的阻塞队列。

 

8、DelayQueue

  1)延迟队列,根据延迟时间排序。

  2)无界(容量为 Integer.MAX_VALUE)。

  3)放进去的元素需要实现 Delayed 接口,这样它才能知道排序规则。

 

3、非阻塞并发队列

1、并发包中的非阻塞队列只有 ConcurrentLinkedQueue 这一种,ConcurrentLinkedQueue 是使用链表作为其数据结构的,使用 CAS 非阻塞算法来实现线程安全(不具备阻塞功能),适合用在对性能要求较高的并发场景。用的相对比较少一些。

 

4、如何选择队列

1、边界

2、空间

3、吞吐量

 

推荐阅读