开篇问题
(1)ConcurrentHashMap与HashMap的数据结构是否一样?
(2)HashMap在多线程环境下何时会出现并发安全问题?
(3)ConcurrentHashMap是怎么解决并发安全问题的?
(4)ConcurrentHashMap使用了哪些锁?
(5)ConcurrentHashMap的扩容是怎么进行的?
(6)ConcurrentHashMap是否是强一致性的?
(7)ConcurrentHashMap不能解决哪些问题?
(8)ConcurrentHashMap中有哪些不常见的技术值得学习?
简介
ConcurrentHashMap是HashMap的线程安全版本,内部也是使用(数组 + 链表 + 红黑树)的结构来存储元素。
相比于同样线程安全的HashTable来说,效率等各方面都有极大地提高。
依赖架构
各种锁简介
这里先简单介绍一下各种锁,以便下文讲到相关概念时能有个印象。
(1)synchronized
java中的关键字,内部实现为监视器锁,主要是通过对象监视器在对象头中的字段来表明的。
synchronized从旧版本到现在已经做了很多优化了,在运行时会有三种存在方式:偏向锁,轻量级锁,重量级锁。
偏向锁,是指一段同步代码一直被一个线程访问,那么这个线程会自动获取锁,降低获取锁的代价。
轻量级锁,是指当锁是偏向锁时,被另一个线程所访问,偏向锁会升级为轻量级锁,这个线程会通过自旋的方式尝试获取锁,不会阻塞,提高性能。
重量级锁,是指当锁是轻量级锁时,当自旋的线程自旋了一定的次数后,还没有获取到锁,就会进入阻塞状态,该锁升级为重量级锁,重量级锁会使其他线程阻塞,性能降低。
(2)CAS
CAS,Compare And Swap,它是一种乐观锁,认为对于同一个数据的并发操作不一定会发生修改,在更新数据的时候,尝试去更新数据,如果失败就不断尝试。
(3)volatile(非锁)
java中的关键字,当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。(这里牵涉到java内存模型的知识,感兴趣的同学可以自己查查相关资料)
volatile只保证可见性,不保证原子性,比如 volatile修改的变量 i,针对i++操作,不保证每次结果都正确,因为i++操作是两步操作,相当于 i = i +1,先读取,再加1,这种情况volatile是无法保证的。
(4)自旋锁
自旋锁,是指尝试获取锁的线程不会阻塞,而是循环的方式不断尝试,这样的好处是减少线程的上下文切换带来的开锁,提高性能,缺点是循环会消耗CPU。
(5)分段锁
分段锁,是一种锁的设计思路,它细化了锁的粒度,主要运用在ConcurrentHashMap中,实现高效的并发操作,当操作不需要更新整个数组时,就只锁数组中的一项就可以了。
(5)ReentrantLock
可重入锁,是指一个线程获取锁之后再尝试获取锁时会自动获取锁,可重入锁的优点是避免死锁。
其实,synchronized也是可重入锁。
源码分析
构造方法
1 | public ConcurrentHashMap() { |
构造方法与HashMap对比可以发现,没有了HashMap中的threshold和loadFactor,而是改用了sizeCtl来控制,而且只存储了容量在里面,那么它是怎么用的呢?官方给出的解释如下:
(1)-1,表示有线程正在进行初始化操作
(2)-(1 + nThreads),表示有n个线程正在一起扩容
(3)0,默认值,后续在真正初始化的时候使用默认容量
(4)> 0,初始化或扩容完成后下一次的扩容门槛
至于,官方这个解释对不对我们后面再讨论。
添加元素
1 | public V put(K key, V value) { |
整体流程跟HashMap比较类似,大致是以下几步:
(1)如果桶数组未初始化,则初始化;
(2)如果待插入的元素所在的桶为空,则尝试把此元素直接插入到桶的第一个位置;
(3)如果正在扩容,则当前线程一起加入到扩容的过程中;
(4)如果待插入的元素所在的桶不为空且不在迁移元素,则锁住这个桶(分段锁);
(5)如果当前桶中元素以链表方式存储,则在链表中寻找该元素或者插入元素;
(6)如果当前桶中元素以红黑树方式存储,则在红黑树中寻找该元素或者插入元素;
(7)如果元素存在,则返回旧值;
(8)如果元素不存在,整个Map的元素个数加1,并检查是否需要扩容;
添加元素操作中使用的锁主要有(自旋锁 + CAS + synchronized + 分段锁)。
为什么使用synchronized而不是ReentrantLock?
因为synchronized已经得到了极大地优化,在特定情况下并不比ReentrantLock差。
初始化桶数组
第一次放元素时,初始化桶数组。
1 | private final Node<K,V>[] initTable() { |
(1)使用CAS锁控制只有一个线程初始化桶数组;
(2)sizeCtl在初始化后存储的是扩容门槛;
(3)扩容门槛写死的是桶数组大小的0.75倍,桶数组大小即map的容量,也就是最多存储多少个元素。
判断是否需要扩容
每次添加元素后,元素数量加1,并判断是否达到扩容门槛,达到了则进行扩容或协助扩容。
1 | private final void addCount(long x, int check) { |
(1)元素个数的存储方式类似于LongAdder类,存储在不同的段上,减少不同线程同时更新size时的冲突;
(2)计算元素个数时把这些段的值及baseCount相加算出总的元素个数;
(3)正常情况下sizeCtl存储着扩容门槛,扩容门槛为容量的0.75倍;
(4)扩容时sizeCtl高位存储扩容邮戳(resizeStamp),低位存储扩容线程数加1(1+nThreads);
(5)其它线程添加元素后如果发现存在扩容,也会加入的扩容行列中来;
协助扩容(迁移元素)
线程添加元素时发现正在扩容且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。
1 | final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { |
当前桶元素迁移完成了才去协助迁移其它桶元素;
迁移元素
扩容时容量变为两倍,并把部分元素迁移到其它桶中。
1 | private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { |
(1)新桶数组大小是旧桶数组的两倍;
(2)迁移元素先从靠后的桶开始;
(3)迁移完成的桶在里面放置一ForwardingNode类型的元素,标记该桶迁移完成;
(4)迁移时根据hash&n是否等于0把桶中元素分化成两个链表或树;
(5)低位链表(树)存储在原来的位置;
(6)高们链表(树)存储在原来的位置加n的位置;
(7)迁移元素时会锁住当前桶,也是分段锁的思想;
删除元素
删除元素跟添加元素一样,都是先找到元素所在的桶,然后采用分段锁的思想锁住整个桶,再进行操作。
1 | public V remove(Object key) { |
(1)计算hash;
(2)如果所在的桶不存在,表示没有找到目标元素,返回;
(3)如果正在扩容,则协助扩容完成后再进行删除操作;
(4)如果是以链表形式存储的,则遍历整个链表查找元素,找到之后再删除;
(5)如果是以树形式存储的,则遍历树查找元素,找到之后再删除;
(6)如果是以树形式存储的,删除元素之后树较小,则退化成链表;
(7)如果确实删除了元素,则整个map元素个数减1,并返回旧值;
(8)如果没有删除元素,则返回null;
获取元素
获取元素,根据目标key所在桶的第一个元素的不同采用不同的方式获取元素,关键点在于find()方法的重写。
1 | public V get(Object key) { |
(1)hash到元素所在的桶;
(2)如果桶中第一个元素就是该找的元素,直接返回;
(3)如果是树或者正在迁移元素,则调用各自Node子类的find()方法寻找元素;
(4)如果是链表,遍历整个链表寻找元素;
(5)获取元素没有加锁;
获取元素个数
元素个数的存储也是采用分段的思想,获取元素个数时需要把所有段加起来。
1 | public int size() { |
(1)元素的个数依据不同的线程存在在不同的段里;(见addCounter()分析)
(2)计算CounterCell所有段及baseCount的数量之和;
(3)获取元素个数没有加锁;
总结
(1)ConcurrentHashMap是HashMap的线程安全版本;
(2)ConcurrentHashMap采用(数组 + 链表 + 红黑树)的结构存储元素;
(3)ConcurrentHashMap相比于同样线程安全的HashTable,效率要高很多;
(4)ConcurrentHashMap采用的锁有 synchronized,CAS,自旋锁,分段锁,volatile等;
(5)ConcurrentHashMap中没有threshold和loadFactor这两个字段,而是采用sizeCtl来控制;
(6)sizeCtl = -1,表示正在进行初始化;
(7)sizeCtl = 0,默认值,表示后续在真正初始化的时候使用默认容量;
(8)sizeCtl > 0,在初始化之前存储的是传入的容量,在初始化或扩容后存储的是下一次的扩容门槛;
(9)sizeCtl = (resizeStamp << 16) + (1 + nThreads),表示正在进行扩容,高位存储扩容邮戳,低位存储扩容线程数加1;
(10)更新操作时如果正在进行扩容,当前线程协助扩容;
(11)更新操作会采用synchronized锁住当前桶的第一个元素,这是分段锁的思想;
(12)整个扩容过程都是通过CAS控制sizeCtl这个字段来进行的,这很关键;
(13)迁移完元素的桶会放置一个ForwardingNode节点,以标识该桶迁移完毕;
(14)元素个数的存储也是采用的分段思想,类似于LongAdder的实现;
(15)元素个数的更新会把不同的线程hash到不同的段上,减少资源争用;
(16)元素个数的更新如果还是出现多个线程同时更新一个段,则会扩容段(CounterCell);
(17)获取元素个数是把所有的段(包括baseCount和CounterCell)相加起来得到的;
(18)查询操作是不会加锁的,所以ConcurrentHashMap不是强一致性的;
(19)ConcurrentHashMap中不能存储key或value为null的元素;
彩蛋——值得学习的技术
ConcurrentHashMap中有哪些值得学习的技术呢?
我认为有以下几点:
(1)CAS + 自旋,乐观锁的思想,减少线程上下文切换的时间;
(2)分段锁的思想,减少同一把锁争用带来的低效问题;
(3)CounterCell,分段存储元素个数,减少多线程同时更新一个字段带来的低效;
(4)@sun.misc.Contended(CounterCell上的注解),避免伪共享;(p.s.伪共享我们后面也会讲的^^)
(5)多线程协同进行扩容;
(6)你又学到了哪些呢?
彩蛋——不能解决的问题
ConcurrentHashMap不能解决什么问题呢?
请看下面的例子:
1 | private static final Map<Integer, Integer> map = new ConcurrentHashMap<>(); |
这里如果有多个线程同时调用unsafeUpdate()这个方法,ConcurrentHashMap还能保证线程安全吗?
答案是不能。因为get()之后if之前可能有其它线程已经put()了这个元素,这时候再put()就把那个线程put()的元素覆盖了。
那怎么修改呢?
答案也很简单,使用putIfAbsent()方法,它会保证元素不存在时才插入元素,如下:
1 | public void safeUpdate(Integer key, Integer value) { |
那么,如果上面oldValue不是跟null比较,而是跟一个特定的值比如1进行比较怎么办?也就是下面这样:
1 | public void unsafeUpdate(Integer key, Integer value) { |
这样的话就没办法使用putIfAbsent()方法了。
其实,ConcurrentHashMap还提供了另一个方法叫replace(K key, V oldValue, V newValue)可以解决这个问题。
replace(K key, V oldValue, V newValue)这个方法可不能乱用,如果传入的newValue是null,则会删除元素。
1 | public void safeUpdate(Integer key, Integer value) { |
那么,如果if之后不是简单的put()操作,而是还有其它业务操作,之后才是put(),比如下面这样,这该怎么办呢?
1 | public void unsafeUpdate(Integer key, Integer value) { |
这时候就没办法使用ConcurrentHashMap提供的方法了,只能业务自己来保证线程安全了,比如下面这样:
1 | public void safeUpdate(Integer key, Integer value) { |