首页 > 技术文章 > HashMap 与 ConcurrentHashMap 底层实现

zhengzhaoxiang 2020-11-20 00:10 原文

一、HashMap 底层源码


JDK7 版本(数组+链表)


我们存放的 hashMap 都会封装成一个节点对象 Entry(key,value),然后将此节点对象存放到一个数组中,存放前首先需要确定存放的数组下标:① 通过 hash(key) 算法得到 key 的 hashcode,并通过 hashcode的高16位和低16位进行异或操作(如果两个相应bit位相同,则结果为0,否则为1)得到32位的 int值,首先将高16位无符号右移16位与低十六位做异或运算。如果不这样做,而是直接做&运算(相同位的两个数字都为1,则为1;若有一个不为1,则为0)那么高十六位所代表的部分特征就可能被丢失 将高十六位无符号右移之后与低十六位做异或运算使得高十六位的特征与低十六位的特征进行了混合得到的新的数值,这样高位与低位的信息都被保留了 。② int值再与(数组长度-1:底位全为1,高位全为0)进行位运算,获取要存放的下标;③ 如果②中得到相同的值时,判断 key值是否相同,如果相同则新value替换旧value。如果key不相同,将value以链表的形式存放在同一个数组下标下,为了提高存放的速度,新的数据,将存放在原链表的头部。即新数据的 next 指向链表的头元素即可。需要注意的是,每次给链表的头部插入一个新的元素之后,需要将链表的头元素赋值给 table 的下标值。代码展示为 :

table[index] = Entry(key,value,table[index]);

画个图理解下:

源码分析:【1】JDK7 中 HashMap 的重要属性和构造器源码展示:用户创建 HashMap 时调用有参构造器时,表示用户自定义数组的大小,但是 hashMap 会判断其是否为2的幂次数,如果不是则将其改为该值的下一个2的幂次数,这是一种非常规的设计,常规的设计是把桶的大小设计为素数。相对来说素数导致冲突的概率要小于合数(例如:用户自定义为15,HashMap 会将其修改为 16)。初始化一个 table 的对象 Entry 其容量如果用户传入则为该值的下一个2的幂等数,负责为默认值16;具体代码展示如下:

 1 public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable
 2 {
 3     // 默认初始容量-必须是2的幂。
 4     static final int DEFAULT_INITIAL_CAPACITY = 16;
 5     // 最大容量 1<<30.
 6     static final int MAXIMUM_CAPACITY = 1 << 30;
 7     // 加载因子
 8     static final float DEFAULT_LOAD_FACTOR = 0.75f;
 9     // 表,根据需要调整大小。长度必须始终是2的幂
10     transient Entry<K,V>[] table;
11     
12     //带参数的构造器,用户可以指定初始化的大小
13     public HashMap(int initialCapacity) {
14         this(initialCapacity, DEFAULT_LOAD_FACTOR);
15     }
16 
17     //不带参数的构造器,默认的table 大小为16
18     public HashMap() {
19         this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR);
20     }
21     
22     //具体初始化的构造器
23     public HashMap(int initialCapacity, float loadFactor) {
24         //这里省略校验的代码......
25 
26         /** 如果 initialCapacity 大于 capacity(2的幂次数)就将 capacity 向左移一位,直到 capacity(2的幂次数) > 
27          * initialCapacity . 如果 initialCapacity 使用户传进来的 table 的大小,那么此时table 
28          * 的大小就是传进来值最接近的大于当前数的2的幂次数
29      */
30         int capacity = 1;
31         while (capacity < initialCapacity)
32             capacity <<= 1;
33 
34         this.loadFactor = loadFactor;
35         //创建一个Entry 对象,大小为 capacity
36         table = new Entry[capacity];
37     }
38 }

【2】进入 HashMap 的 put 方法添加元素的源码展示:方法中嵌套的方法,会单独进行说明。例如 hash(key)方法等等。

 1 public V put(K key, V value) {
 2         //如果 key 是空进入 putForNullKey 方法:将key固定放在第0个位置
 3     if (key == null)
 4         return putForNullKey(value);
 5         //对 key 进行 hash 计算
 6     int hash = hash(key);
 7     int i = indexFor(hash, table.length);
 8         /** 举个栗子:如果我们想 map 中存入了一个值 map.put(1,2) 此时我继续存入相同的 key
 9           *  int oldVlue = map.put(1,3) 时,会返回一个值,是我之前存放的那个 2也就是当前key的旧值
10           * 如下代码就是对上述问题的实现,会遍历当前的链表,如果hash值相同则覆盖当前值,并返回旧值
11           */
12     for (Entry<K,V> e = table[i]; e != null; e = e.next) {
13         Object k;
14         if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
15             V oldValue = e.value;
16             e.value = value;
17             e.recordAccess(this);
18             return oldValue;
19         }
20     }
21 
22     modCount++;
23     addEntry(hash, key, value, i);
24     return null;
25 }

【3】通过 hash 算法对 key 值进行计算源码展示:简单理解为,计算 key 的 hash 值即可。后面的那些位运算的目的都是为了提高元素分布的均匀性。如果对于链表来说,如果太长的话,查询还是比较影响性能的。

 1 final int hash(Object k) {
 2     int h = 0;
 3         //先获取 key 的hashcode 值
 4     h ^= k.hashCode();
 5 
 6     // 此函数确保只有在每个位位置的常数倍数有界
 7     // 碰撞次数(在默认负载因子下约为8)
 8         // 看不懂,就这样理解,主要是为了减少数据的重合问题
 9     h ^= (h >>> 20) ^ (h >>> 12);
10     return h ^ (h >>> 7) ^ (h >>> 4);
11 }

【4】indexFor 的源码展示:获取 key 要存放的 table 的下标,这里是通过 ‘与’ 运算符进行计算。也是上面初始化容量为什么要使用2的幂次数的原因。举个栗子:16使用二进制表示为 0001 0000 此时 16-1 则为 0000 1111 那么此时 key 的 hash 值与 length-1 的 ‘与’ 操作,只会出现在 0-15 之间。但是如果与 16 ‘与’ 则不能达到这种效果,主要是因为它的底四位不全是 1。类似于我们的取‘模’操作。

1 static int indexFor(int h, int length) {
2     return h & (length-1);
3 }

【5】进入 addEntry 方法:首先判断是否需要扩容,如果要对数组进行扩容,肯定是新创建一个数组(扩容底层是Arrays.copyOf实现的),将原数组的值全都复制到新的数组当中。此时,会出现一个死循环的问题,就是当调用 transfer 方法进行数组赋值的时候。如果当前数组的下标,存在>2的链表时,多线程的情况下,就会出现死循环。原因是因为,我们在复制链表值的时候,会将链表的顺序进行调换。第一个用户进去后复制完后,基于第一个用户的结果,第二个用户继续复制时,就会发生死循环。你可以画个图玩玩。解决办法:就是不让它扩容,设置自己初始化一个能控制的容量大小即可。

 1 void addEntry(int hash, K key, V value, int bucketIndex) {
 2         //扩容:当元素的大小 > 容量*负载系数,第二个条件是当你当前要存储的对象数组下标非空的时候
 3         // 条件二在JDK8 中不存在
 4     if ((size >= threshold) && (null != table[bucketIndex])) {
 5                 // 扩容方法,将容量扩展为原大小的2倍。代码源码会粘在下方
 6         resize(2 * table.length);
 7         hash = (null != key) ? hash(key) : 0;
 8         bucketIndex = indexFor(hash, table.length);
 9     }
10         
11         //就是我们说的,链表下移的实现
12         //代码粘在下方
13     createEntry(hash, key, value, bucketIndex);
14 }
15 
16 //扩容方法
17 void resize(int newCapacity) {
18     Entry[] oldTable = table;
19     int oldCapacity = oldTable.length;
20     if (oldCapacity == MAXIMUM_CAPACITY) {
21         threshold = Integer.MAX_VALUE;
22         return;
23     }
24         
25         //重点,创建了一个新的数组
26     Entry[] newTable = new Entry[newCapacity];
27     boolean oldAltHashing = useAltHashing;
28     useAltHashing |= sun.misc.VM.isBooted() &&
29             (newCapacity >= Holder.ALTERNATIVE_HASHING_THRESHOLD);
30     boolean rehash = oldAltHashing ^ useAltHashing;
31         //该方法将数据进行复制到新的数组中。重点:这个地方是线程不安全的,代码源码我粘在下方:
32     transfer(newTable, rehash);
33     table = newTable;
34     threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
35 }
36 
37 /**
38  * 将所有条目从当前表传输到newtable。多线程的情况下,会出现死循环。有人也称为死锁。
39  */
40 void transfer(Entry[] newTable, boolean rehash) {
41     int newCapacity = newTable.length;
42         //循环遍历数组中的数据
43     for (Entry<K,V> e : table) {
44                 //循环遍历,链表中的数据,此方法非线程安全。
45                 // 原因:扩容前后链表的顺序是相反的,多线程就会导致这个问题。
46         while(null != e) {
47             Entry<K,V> next = e.next;
48             if (rehash) {
49                 e.hash = null == e.key ? 0 : hash(e.key);
50             }
51             int i = indexFor(e.hash, newCapacity);
52             e.next = newTable[i];
53             newTable[i] = e;
54             e = next;
55         }
56     }
57 }
58 
59 //先获取当前下标的数据,然后将数据插入链表头部,并将链表下移
60 void createEntry(int hash, K key, V value, int bucketIndex) {
61     Entry<K,V> e = table[bucketIndex];
62     table[bucketIndex] = new Entry<>(hash, key, value, e);
63     size++;
64 }

JDK8 版本(红黑树)


因为上面说的 HashMap 可能会存在一个很长的链表。HashMap 的 get性能就会出现问题。JDK8就是将长的链表改为了一颗红黑树(二叉树)和扩容的优化等,能够提高 HashMap 的查询效率。红黑树链接

源码分析:【1】 HashMap类中有一个非常重要的字段,就是 Node[] table,即哈希桶数组,明显它是一个 Node的数组。我们来看 Node[JDK1.8]是何物。如下,Node是 HashMap的一个内部类,实现了 Map.Entry接口,本质是就是一个映射(键值对)。上图中的每个黑色圆点就是一个 Node对象。

 1 static class Node<K,V> implements Map.Entry<K,V> {
 2         final int hash;    //用来定位数组索引位置
 3         final K key;
 4         V value;
 5         Node<K,V> next;   //链表的下一个node
 6 
 7         Node(int hash, K key, V value, Node<K,V> next) { ... }
 8         public final K getKey(){ ... }
 9         public final V getValue() { ... }
10         public final String toString() { ... }
11         public final int hashCode() { ... }
12         public final V setValue(V newValue) { ... }
13         public final boolean equals(Object o) { ... }
14 }

【2】主要看 HashMap 的 put 方法:hash(key) 的算法与 JDK7 中有点区别:少了很多的位运算,主要是 JDK8中采用了红黑树,能够分担一些查询压力。当链表的长度>=8的时候转为红黑树,当红黑树 <= 6 的时候会转为链表。

1 public V put(K key, V value) {
2     return putVal(hash(key), key, value, false, true);
3 }
4 
5 //hash(key)的算法,就放在此处
6 static final int hash(Object key) {
7     int h;
8     return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
9 }

【3】进入 putVal 方法,源码如下:首先会判断 table 是否为空。如果存放的 key,table中已经存在,则将旧值返回,存入新值。如果当前的需要存放的节点是 TreeNode,则存放在红黑树中。否则存放在链表中(且存放在链表的尾部,也就不存在扩容时的死循环问题),存放前需要对链表的长度进行判断,判断是否大于等于默认值8。如果是的话,就将链表转化为红黑树方式存放。最后判断是否需要对 table 进行扩容操作(链表扩容或者红黑树扩容)。

 1 final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
 2                boolean evict) {
 3     Node<K,V>[] tab; Node<K,V> p; int n, i;
 4     //判断table是不是为空
 5     if ((tab = table) == null || (n = tab.length) == 0)
 6         //对table进行扩容
 7         n = (tab = resize()).length;
 8         //如果数组的当前值为空,则直接存放即可
 9     if ((p = tab[i = (n - 1) & hash]) == null)
10         tab[i] = newNode(hash, key, value, null);
11     else {
12         Node<K,V> e; K k;
13         //判断key 是否已存在
14         if (p.hash == hash &&
15             ((k = p.key) == key || (key != null && key.equals(k))))
16             e = p;
17         //如果是TreeNode 节点,则存放在红黑树中
18         else if (p instanceof TreeNode)
19             e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
20         //存放在链表中(且存放在链表的尾部),存放前需要对链表的长度进行判断,判断是否大于等于默认值8。如果是的话,就将链表转化为红黑树方式存放。
21         else {
22             for (int binCount = 0; ; ++binCount) {
23                 if ((e = p.next) == null) {
24                     //将数据放在链表的尾部
25                     p.next = newNode(hash, key, value, null);
26                     if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
27                         treeifyBin(tab, hash);
28                     break;
29                 }
30                 if (e.hash == hash &&
31                     ((k = e.key) == key || (key != null && key.equals(k))))
32                     break;
33                 p = e;
34             }
35         }
36         if (e != null) { // 存在相同的key 则返回旧值,存入新值
37             V oldValue = e.value;
38             if (!onlyIfAbsent || oldValue == null)
39                 e.value = value;
40             afterNodeAccess(e);
41             return oldValue;
42         }
43     }
44     ++modCount;
45     //判断是都需要扩容
46     if (++size > threshold)
47         /*扩容方法:就不粘源码了,还是有点难度的。简单说下规则:举个栗子,如果当前的table为16(0001 0000)
48                 * 我们计算下标时,将hash 值与15(0000 1111)进行&运算。
49                 * 如果对16扩容,则得到32(0010 0000),此时hash 与 31(0001 1111)进行&运算。这里有个规律就是
50                 * 当前key的下标值,要么与扩容前的一样,要么等于 index+oldTable.length(旧的table的长度)
51                 * 原因是因为现在参加运算的 0001 1111 与之前的 0000 1111 值刚好是一倍的关系
52                 * 此时key就算的hash值的第五位如果是0 则表示与原key 的index一样,如果是1,则是 index+oldTable.length 也就是这里的16。
53                 */
54         resize();
55     afterNodeInsertion(evict);
56     return null;
57 }

 HashTable 能够解决 HashMap 线程不安全的问题。问题是它给所有的方法都加了 synchronized 同步代码块,严重影响系统的性能。

二、HashMap 的数据插入流程


【1】判断键值对数组 table[i]是否为空或为null,否则执行resize()进行扩容;
【2】根据键值key计算hash值得到插入的数组索引i,如果table[i]==null,直接新建节点添加,转向⑥,如果table[i]不为空,转向③;
【3】判断table[i]的首个元素是否和key一样,如果相同直接覆盖value,否则转向④,这里的相同指的是hashCode以及equals;
【4】判断table[i] 是否为treeNode,即table[i] 是否是红黑树,如果是红黑树,则直接在树中插入键值对,否则转向⑤;
【5】遍历table[i],判断链表长度是否大于8,大于8的话把链表转换为红黑树,在红黑树中执行插入操作,否则进行链表的插入操作;遍历过程中若发现key已经存在直接覆盖value即可;
【6】插入成功后,判断实际存在的键值对数量 size是否超多了最大容量 threshold,如果超过,进行扩容。

当hashmap中的元素个数超过数组大小*loadFactor时,就会进行数组扩容,loadFactor的默认值为0.75,也就是说,默认情况下,数组大小为16,那么当 hashmap中元素个数超过16*0.75=12的时候,就把数组的大小扩展为 2*16=32,即扩大一倍,然后重新计算每个元素在数组中的位置,而这是一个非常消耗性能的操作,所以如果我们已经预知 hashmap中元素的个数,那么预设元素的个数能够有效的提高 hashmap的性能。比如说,我们有1000个元素 new HashMap(1000),但是理论上来讲 new HashMap(1024)更合适,即使是1000,hashmap也自动会将其设置为1024。 但是 new HashMap(1024)还不是更合适的,因为0.75*1000 < 1000,也就是说为了让 0.75 * size > 1000,我们必须这样 new HashMap(2048)才最合适,既考虑了&的问题,也避免了 resize的问题。 

三、HashMap 的哈希函数怎么设计的


hash 函数是先通过 key 的 hashcode 的到 32位的 int值,然后让 hashcode 的高16位与低16位进行异或操作。这也叫扰动函数,设计的原因有如下两点:
【1】能够降低 hash碰撞,越分散越好:因为 key.hashCode() 函数调用的是 key键值类型自带的哈希函数,返回 int型散列值。int值范围为[-2^31~2^31-1],前后加起来大概 40亿的映射空间。只要哈希函数映射得比较均匀松散,一般应用是很难出现碰撞的。但问题是一个40亿长度的数组,内存是放不下的。右位移16位,正好是 32bit的一半,自己的高半区和低半区做异或,就是为了混合原始哈希码的高位和低位,以此来加大低位的随机性。而且混合后的低位掺杂了高位的部分特征,这样高位的信息也被变相保留下来。最后我们来看一下 Peter Lawley 的一篇专栏文章《An introduction to optimising a hashing strategy》里的的一个实验:他随机选取了352个字符串,在他们散列值完全没有冲突的前提下,对它们做低位掩码,取数组下标。

结果显示,当HashMap 数组长度为 512的时候(2^9 ),也就是用掩码取低9位的时候,在没有扰动函数的情况下,发生了103次碰撞,接近30%。而在使用了扰动函数之后只有92次碰撞。碰撞减少了将近10%。看来扰动函数确实还是有功效的。另外Java1.8 相比1.7做了调整,1.7做了四次移位和四次异或,但明显Java 8觉得扰动做一次就够了,做4次的话,多了可能边际效用也不大,所谓为了效率考虑就改成一次了。
【2】算法一定要尽可能高效,因为这是高频操作, 因此采用位运算

四、JDK1.8 对比 JDK1.7 做了哪些优化


【1】数组+链表改成了数组+链表或红黑树:防止发生 hash冲突,链表长度过长,将时间复杂度由 O(n)降为 O(logn);
【2】链表的插入方式从头插法改成了尾插法:插入时如果数组位置上已经有元素,1.7 将新元素放到数组中,原始节点作为新节点的后继节点,1.8 则遍历链表,将元素放置到链表的最后;因为 1.7头插法扩容时,头插法会使链表发生反转,多线程环境下会产生环;A线程在插入节点B,B线程也在插入,遇到容量不够开始扩容,重新 hash,放置元素,采用头插法,后遍历到的B节点放入了头部,这样形成了环,如下图所示:

【3】扩容的时候1.7需要对原数组中的元素进行重新hash定位在新数组的位置,1.8采用更简单的判断逻辑,位置不变或索引+旧容量大小:只需要看看原来的 hash值新增的那个 bit是 1还是 0就好了,是 0的话索引没变,是 1的话索引变成“原索引+oldCap”。

这个设计确实非常的巧妙,既省去了重新计算 hash值的时间,而且同时,由于新增的 1bit是0还是1可以认为是随机的,因此resize的过程,均匀的把之前的冲突的节点分散到新的 bucket了。这一块就是JDK1.8 新增的优化点。有一点注意区别,JDK1.7中 rehash的时候,旧链表迁移新链表的时候,如果在新表的数组索引位置相同,则链表元素会倒置,但是从上图可以看出,JDK1.8不会倒置。

【4】在插入时,1.7先判断是否需要扩容,再插入,1.8先进行插入,插入完成再判断是否需要扩容
【5】在多线程的情况下1.7 会产生死循环、数据丢失、数据覆盖的问题,1.8 中会有数据覆盖的问题

五、平常怎么解决线程不安全问题


Java 中有 HashTable、Collections.synchronizedMap、以及 ConcurrentHashMap可以实现线程安全的Map。HashTable是直接在操作方法上加 synchronized关键字,锁住整个数组,粒度比较大,Collections.synchronizedMap是使用 Collections集合工具的内部类,通过传入Map封装出一个 SynchronizedMap对象,内部定义了一个对象锁,方法内通过对象锁实现;ConcurrentHashMap JKD1.7使用分段锁,降低了锁粒度,让并发度大大提高,1.8 使用 CAS自旋锁,保证线程安全。

六、ConcurrentHashMap 底层源码


JDK7 版本(Segment+ReentrenLock)


分段锁机制思想(了解):简而言之,ConcurrentHashMap 在对象中保存了一个 Segment 数组,即将整个 Hash表划分为多个分段;而每个 Segment元素,即每个分段类似于一个Hashtable;在执行 put 操作时首先根据 hash算法定位到元素属于哪个 Segment,然后对该 Segment 加锁即可。因此 ConcurrentHashMap 在多线程并发编程中是线程安全的。简单的原理图如下:一个 Segment 管理一个 Entry 数组(2的幂次数)。 但是其最大并发度受 Segment 的个数限制。因此,在JDK1.8中,ConcurrentHashMap 的实现原理摒弃了这种设计,而是选择了与HashMap 类似的数组+链表+红黑树的方式实现,而加锁则采用 CAS 和 synchronized 实现。

源码分析:【1】ConcurrentHashMap 的初始化工作:从构造器中可以看出,首先会初始化一个 segment 数组和 segment 中包含的 entry 数组。

 1 //创建具有默认初始容量(默认 16)的新空映射,负载系数(0.75)和并发级别(默认 16)。
 2 public ConcurrentHashMap() {
 3         //源码粘在下方
 4     this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
 5 }
 6 
 7 //使用指定的初始值创建新的空映射,容量、负载因子和并发级别。
 8 public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
 9     //省略校验部分......
10     
11     //将传入的值修改为 2的幂次数(最接近当前值的幂次数:15时取16(2^4))
12     while (ssize < concurrencyLevel) {
13         ++sshift;
14         ssize <<= 1;
15     }
16     this.segmentShift = 32 - sshift;
17     this.segmentMask = ssize - 1;
18     if (initialCapacity > MAXIMUM_CAPACITY)
19         initialCapacity = MAXIMUM_CAPACITY;
20     
21     //计算一个segment 中存放的 entry 对象
22     int c = initialCapacity / ssize;
23     if (c * ssize < initialCapacity)
24         ++c;
25     int cap = MIN_SEGMENT_TABLE_CAPACITY;
26     //对计算出来的 c 取2的幂次数
27     while (cap < c)
28         cap <<= 1;
29     //创建segments和segments[0]:一个segments 就是一个HashMap里面存放的是 Entry[] 数组大小时计算的 cap
30     Segment<K,V> s0 =
31         new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
32                          (HashEntry<K,V>[])new HashEntry[cap]);
33     Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
34     UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
35     this.segments = ss;
36 }

【2】下面我们看下 put 方法的源码:比较新鲜的地方就是,首先会计算出当前 key 的 hash 值位于哪个 segment 数组下标。然后获取这个 segment 对象。进行put 操作,此时这个 put 操作时加了重入锁的,后续的操作与 JDK 中的 HashMap 都是一样的了。如果不存在直接存放在 Entry[] 数组中,否则存放在链表中。

 1 public V put(K key, V value) {
 2     Segment<K,V> s;
 3     //首先计算key的hash 值
 4     int hash = hash(key);
 5     //计算出 hash 值所在的 segment 
 6     int j = (hash >>> segmentShift) & segmentMask;
 7     //UNSAFE.getObject 意思是从 segments 的下标 j 中获取对象 。
 8     if ((s = (Segment<K,V>)UNSAFE.getObject          // 通过 UNSAFE 操作时线程安全的
 9          (segments, (j << SSHIFT) + SBASE)) == null) 
10         //如果segment是空,则创建一个新的 segment
11         s = ensureSegment(j);
12     //给segment中添加元素,源码粘在下方
13     return s.put(key, hash, value, false);
14 }
15 
16 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
17     //加锁 (segment 继承了可重入锁)
18     HashEntry<K,V> node = tryLock() ? null :
19         scanAndLockForPut(key, hash, value);
20     V oldValue;
21     
22     //底下的方法,主要就是给segment 管理的那部分 entry[]数组中添加元素。如果存在则已链表的形式存放
23     try {
24         HashEntry<K,V>[] tab = table;
25         int index = (tab.length - 1) & hash;
26         HashEntry<K,V> first = entryAt(tab, index);
27         for (HashEntry<K,V> e = first;;) {
28             if (e != null) {
29                 K k;
30                 if ((k = e.key) == key ||
31                     (e.hash == hash && key.equals(k))) {
32                     oldValue = e.value;
33                     if (!onlyIfAbsent) {
34                         e.value = value;
35                         ++modCount;
36                     }
37                     break;
38                 }
39                 e = e.next;
40             }
41             else {
42                 if (node != null)
43                                         //将新的node添加在链表头部
44                     node.setNext(first);
45                 else
46                     node = new HashEntry<K,V>(hash, key, value, first);
47                 int c = count + 1;
48                 if (c > threshold && tab.length < MAXIMUM_CAPACITY)
49                     rehash(node);
50                 else
51                     setEntryAt(tab, index, node);
52                 ++modCount;
53                 count = c;
54                 oldValue = null;
55                 break;
56             }
57         }
58     } finally {
59         //释放锁
60         unlock();
61     }
62     return oldValue;
63 }

JDK8 版本(数组+链表+红黑树)


去除了segment 片段锁机制。思想是给 table的每一个下标都加锁,也就是当对下标进行操作时都会加锁(CAS+Synchronize)。ConcurrentHashMap 成员变量使用 volatile 修饰,免除了指令重排序,同时保证内存可见性,另外使用 CAS操作和 synchronized结合实现赋值操作,多线程操作只会锁住当前操作索引的节点。

源码分析:主要查看 put 方法的实现:

 1 final V putVal(K key, V value, boolean onlyIfAbsent) {
 2     //计算hash 值
 3     int hash = spread(key.hashCode());
 4     int binCount = 0;
 5     for (Node<K,V>[] tab = table;;) {
 6         Node<K,V> f; int n, i, fh;
 7         if (tab == null || (n = tab.length) == 0)
 8             //初始化 table
 9             tab = initTable();
10         //添加到空箱子时没有锁定
11         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
12             //new 一个node 节点,放置在 i 下标下,通过 CAS 循环锁,安全放置
13             if (casTabAt(tab, i, null,
14                          new Node<K,V>(hash, key, value, null)))
15                 break;                   
16         }
17         //动态扩容
18         else if ((fh = f.hash) == MOVED)
19             tab = helpTransfer(tab, f);
20         else {
21             V oldVal = null;
22             //*******************重点*********************
23             //同步 f 就是链表的头元素
24             synchronized (f) {
25                 //判断 f 是否为当前的偏移量的值(也就是没有别人对此值进行了修改操作)否则直接退出同步
26                 if (tabAt(tab, i) == f) {
27                     //链表操作
28                     if (fh >= 0) {
29                         binCount = 1;
30                         for (Node<K,V> e = f;; ++binCount) {
31                             K ek;
32                             if (e.hash == hash &&
33                                 ((ek = e.key) == key ||
34                                  (ek != null && key.equals(ek)))) {
35                                 oldVal = e.val;
36                                 if (!onlyIfAbsent)
37                                     e.val = value;
38                                 break;
39                             }
40                             Node<K,V> pred = e;
41                             if ((e = e.next) == null) {
42                                 pred.next = new Node<K,V>(hash, key,
43                                                           value, null);
44                                 break;
45                             }
46                         }
47                     }
48                     //树化操作
49                     else if (f instanceof TreeBin) {
50                         Node<K,V> p;
51                         binCount = 2;
52                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
53                                                        value)) != null) {
54                             oldVal = p.val;
55                             if (!onlyIfAbsent)
56                                 p.val = value;
57                         }
58                     }
59                 }
60             }
61             if (binCount != 0) {
62                 if (binCount >= TREEIFY_THRESHOLD)
63                     treeifyBin(tab, i);
64                 if (oldVal != null)
65                     return oldVal;
66                 break;
67             }
68         }
69     }
70     addCount(1L, binCount);
71     return null;
72 }

推荐阅读