Map(4-ConcurrentHashMap)
ConcurrentHashMap1.7
属性和接口
ConcurrentHashMap1.7 采用了数组+Segment+分段锁的方式实现,即 ConcurrentHashMap 把哈希桶数组切分成小数组(Segment),每个小数组有 n 个 HashEntry 组成
一个 ConcurrentHashMap 里包含一个 Segment 数组,Segment 的个数一旦初始化就不能改变。 Segment 数组的大小默认是 16,也就是说默认可以同时支持 16 个线程并发写。
Segment 的结构和 HashMap 类似,是一种数组和链表结构,一个 Segment 包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素,每个 Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得对应的 Segment 的锁。也就是说,对同一 Segment 的并发写入会被阻塞,不同 Segment 的写入是可以并发执行的。
1 | public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> |
1 | static final class Segment<K,V> extends ReentrantLock implements Serializable { |
1 | static final class HashEntry<K,V> { |
构造方法
1 | public ConcurrentHashMap() { |
- 进行参数校验,成功则进行下面的操作
- 校验并发级别 concurrencyLevel 大小,如果大于最大值,重置为最大值,无参构造默认值是 16
- 寻找并发级别 concurrencyLevel 之上最近的 2 的幂次方值(ssize),作为初始化容量大小,默认是 16
- 记录 segmentShift 偏移量(segmentShift),这个值为【容量 = 2 的 N 次方】中的 N,在后面 put 时计算位置时会用到。默认是 32-sshift = 28
- 记录 segmentMask 段掩码(segmentMark = 2^n-1),默认是 ssize-1 = 16-1 = 15
- 计算每个 Segment 中的类似于 HashMap 的容量 c
- Segment 中的类似于 HashMap 的容量 c 必须是 2 的倍数,计算出大于 c 的 2 的 n 次方(cap)
- 初始化 segment [0],默认大小为 2,负载因子为 0.75,扩容阈值为 2 *0.75 = 1.5(cap* loadFactor),插入第二个值才会进行扩容
插入
1 | public V put(K key, V value) { |
- 计算要 put 的 key 的位置,获取指定位置的 Segment
- 如果指定位置的 Segment 为空,则初始化这个 Segment
- Segment.put 插入 key,value 值
初始化 Segment 流程:
- 检查计算得到的位置的 Segment 是否为 null
- 为 null 则继续初始化,使用 Segment [0] 的容量和负载因子创建一个 cap 容量的 HashEntry 数组
- 再次检查计算得到的指定位置的 Segment 是否为 null,因为这时可能有其他线程进行了操作
- 为 null 则使用创建的 HashEntry 数组初始化这个 Segment
- 自旋判断计算得到的指定位置的 Segment 是否为 null,是的话使用 CAS 在这个位置赋值为 Segment
1 | final V put(K key, int hash, V value, boolean onlyIfAbsent) { |
如果指定的 Segment 不为空,由于 Segment 继承了 ReentrantLock,所以 Segment 内部可以很方便地获取锁
- tryLock()获取锁,获取不到使用 scanAndLockForPut 方法继续获取
- 计算 put 的数据要放入的 index 位置,然后获取这个位置上的 HashEntry
- 遍历 put 新元素,因为这里获取的 HashEntry 可能是一个空元素,也可能是链表已存在,所以要区别对待
如果这个位置上的 HashEntry 不存在:
- 如果当前容量大于扩容阈值,小于最大容量,进行扩容
- 直接头插法插入
如果这个位置上的 HashEntry 存在:
- 判断链表当前元素 key 和 hash 值是否和要 put 的 key 和 hash 值一致,一致则替换值;不一致则获取链表下一个节点,直到发现相同进行值替换,或者链表里没有相同的
- 如果要插入的位置之前已经存在,替换后返回旧值,否则返回 null
scanAndLockForPut(自旋)
在不超过最大重试次数 MAX_SCAN_RETRIES 通过 CAS 获取锁
1 | private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { |
rehash(扩容)
ConcurrentHahsMap 的扩容只会扩容到原来的两倍。老数组里的数据移动到新的数组时,位置要么不变,要么变为 index+oldSize,参数里的 node 会在扩容之后使用链表头插法插入到指定位置
1 | private void rehash(HashEntry<K,V> node) { |
查找
1 | public V get(Object key) { |
- 通过 key 值的 hash 值定位到对应 Segment 对象,再通过 hash 值定位到具体的 entry 对象
- 遍历链表,通过 equals 取出数据
- 由于 HashEntry 中的 value 属性是用 volatile 关键字修饰的,保证了内存可见性,所以每次获取时都是最新值,整个过程不需要加锁
删除
1 | public V remove(Object key) { |
scanAndLock(自旋)
扫描是否含有指定的 key 并且获取同步锁,当方法执行完毕,即跳出循环成功获取到同步锁,跳出循环的方式:
- tryLock 方法尝试获取独占锁成功
- 尝试获取超过最大自旋次数 MAX_SCAN_RETRIES 线程堵塞,当线程从等待队列中被唤醒获取到锁跳出循环
1 | private void scanAndLock(Object key, int hash) { |
ConcurrentHashMap1.8
属性和接口
ConcurrentHashMap 采用了数组+链表+红黑树的实现方法,内部大量采用 CAS 操作,只需要这个链表头节点(红黑树的根节点),就不会影响其他的哈希桶数组元素的读写
1 | public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> |
构造方法
1 | public ConcurrentHashMap() { |
在调用构造方法创建 ConcurrentHashMap 对象时,只是根据传入参数计算桶数组初始长度赋值给 sizeCtl,并没有初始化 table 数组,只有在插入元素时才用 initTable 方法进行延迟加载
initTable(初始化)
1 | private final Node<K,V>[] initTable() { |
ConcurrentHashMap 的初始化是通过自旋和 CAS 操作完成的。里面需要注意的是变量 sizeCtl,它的值决定着当前的初始化状态
- sizeCtl =-1:表示有线程正在进行真正的初始化操作
- sizeCtl =-(1+nThreads):表示有 nThreads 个线程正在扩容操作
- sizeCtl > 0:如果 table 数组还没有初始化,这就是初始化的长度;如果已经初始化了,sizeCtl 是 table 数组长度的 0.75 倍,代表扩容阈值
- sizeCtl = 0:默认值,此时在真正的初始化操作中使用默认容量
插入
1 | public V put(K key, V value) { |
- 根据 key 计算出 hashCode,判断下面的 4 种情况
- 判断是否需要进行初始化,如果桶数组为空,初始化桶数组(CAS+自旋,即执行 initTable 方法)
- 判断当前 key 定位出的 Node,如果桶数组中的元素为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋,保证成功
- 如果当前位置的 hashCode = MOVED(-1),说明其他线程在扩容,则需要进行扩容
- 如果都不满足,则利用 synchronized 锁写入数据
- 如果数量大于 TREEIFY_THRESHOLD 则要执行树化方法,在 treeifyBin 中会首先判断当前数组长度 >= 64 时才会将链表转换为红黑树
- 判断是否需要扩容,如果超过了临界值就需要扩容
触发扩容的情况
1 | //新增元素时,也就是在调用 putVal 方法后,为了通用,增加了个 check 入参,用于指定是否可能会出现扩容的情况 |
1 | //扩容状态下其他线程对集合进行插入、修改、删除、合并、compute等操作时遇到 ForwardingNode 节点会调用该帮助扩容方法 (ForwardingNode 后面介绍) |
1 | //putAll批量插入或者插入节点后发现链表长度达到8个或以上,但数组长度为64以下时触发的扩容会调用到这个方法 |
- 在调用 addCount 方法增加集合元素计数后发现当前集合元素个数到达扩容阈值时就会触发扩容
- 扩容状态下其他线程对集合进行插入、修改、删除、合并、compute 等操作时遇到 ForwardingNode 节点会触发扩容
- putAll 批量插入或者插入节点后发现存在链表长度达到 8 个以上,但数组长度为 64 以下时会触发扩容
扩容方法
调用该扩容方法的地方:
- addCount:向集合中插入新数据后更新容量计数时发现到达扩容阈值而触发的扩容
- helpTransfer:扩容状态下其他线程对集合进行插入、修改、删除、合并、compute 等操作时遇到 ForwardingNode 节点时触发的扩容
- tryPresize:putAll 批量插入或者插入后发现链表长度达到 8 个或以上,但数组长度为 64 以下时触发的扩容
1 | private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { |
- 扩容期间在未迁移到的 hash 桶插入数据会发生什么?
只要插入的位置扩容线程还未迁移到,就可以插入,当迁移到该插入的位置时,就会阻塞等待插入操作完成再继续迁移
- 正在迁移的 hash 桶遇到 get 操作会发生什么?
在扩容过程期间形成的 hn 和 ln 链是使用的类似于复制引用的方式,即 ln 和 hn 链是复制出来的,而非原来的链表迁移过去的,所以原来 hash 桶上的链表并没有受到影响,因此从迁移开始到迁移结束这段时间都是可以正常访问原数组 hash 桶上面的链表,迁移结束后放置上 fwd,往后的询问请求就直接转发到扩容后的数组去了
- 如果 lastRun 节点正好在一条全部都为高位或者都为低位的链表上,会不会形成死循环?
在数组长度为 64 之前会导致一直扩容,但是到了 64 或者以上后就会转换为红黑树,因此不会一直死循环
- 扩容后 ln 和 hn 链不用经过 hash 取模运算,分别被直接放置在新数组的 i 和 n+1 的位置上,那么如何保证这种方式依旧可以用过 h&(n-1)正确算出 hash 桶的位置?
如果 fh&(n-1)= i,那么扩容之后的 hash 计算方法应该是 fh&(2n-1),因此 n 是 2 的幂次方数组,所以如果 n = 16,n-1 就是 1111(二进制),那么 2n-1 就是 11111(二进制),所以说如果 fh 的第 5 位不是 1 的话,fh&n = 0 可得出 fh&(2n-1)== fh&(n-1)= i;如果是 1 的话,fh&n = n 可得出 fh&(2n-1)= i+n
查找
1 | public V get(Object key) { |
- 根据 hash 值计算位置
- 查找到指定位置,如果头节点就是要找的,直接返回它的 value
- 如果头节点 hash 值小于 0,说明正在扩容或者是红黑树,find 查找
- 如果是链表,遍历查找
为什么不支持 Key 或 Value 为 null?
-
避免歧义:在多线程环境下,get(key)方法如果返回 null,不知道这个 null 是代表 key 不存在或者是值本来就是 null;
-
简化实现:如果允许 null,代码里面就需要频繁的去判断 null 到底是代表 key 不存在或者是值本来就是 null
HashMap 为什么可以?
HashMap 设计的初衷是单线程,它有 containsKey 方法可以判断 key 是否存在。ConcurrentHashMap 不能用 containsKey,因为多线程环境下也会有歧义。比如:刚判断完 key 不存在,然后就有一个线程插入了这个 key