ConcurrentMap(1.7和1.8)

Unsafe类和内存屏障简介

不论是在jdk1.7还是jdk1.8版本中ConcurrentHashMap中使用的最为核心也是最为频繁的就是Unsafe类中的各种native本地方法。所以这里有必要先介绍一下其中用的最多的几个Unsafe类中的核心方法。主要的几个方法是Unsafe.putObjectVolatile(obj,long,obj2)、 Unsafe.getObjectVolatile、 Unsafe.putOrderedObject等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void sun::misc::Unsafe::putObjectVolatile (jobject obj, jlong offset, jobject value)
  {
  write_barrier ();
  volatile jobject *addr = (jobject *) ((char *) obj + offset);
  *addr = value;
  }

void sun::misc::Unsafe::putObject (jobject obj, jlong offset, jobject value)
  {
  jobject *addr = (jobject *) ((char *) obj + offset);
  *addr = value;
  }//用于和putObjectVolatile进行对比

jobject sun::misc::Unsafe::getObjectVolatile (jobject obj, jlong offset)
  {
  volatile jobject *addr = (jobject *) ((char *) obj + offset);
  jobject result = *addr;
  read_barrier ();
  return result;
  }

void sun::misc::Unsafe::putOrderedObject (jobject obj, jlong offset, jobject value)
  {
  volatile jobject *addr = (jobject *) ((char *) obj + offset);
  *addr = value;
  }

在上述Unsafe几个方法的源代码中,可以看到有write_barrier和read_barrier这两个内存屏障,这两个就是对应的硬件中的写屏障和读屏障,java内存模型中使用的所谓的LoadLoad、LoadStore、StoreStore、StoreLoad这几个屏障就是基于这两个屏障实现的。写屏障的作用就是禁止了指令的重排序,并且配合C语言中的volatile关键字(C中的volatile关键字只能保证可见性不能保证有序性),个人理解就是通过添加内存屏障+C中的Volatile实现了类似Java中的Volatile关键字语义,即在putObjectVolatile方法中通过内存屏障保证了有序性,再通过volatile保证将对指定地址的操作是马上写入到共享的主存中而不是线程自身的本地工作内存中,这样配合下面的getObjectVolatile方法,就可以确保每次读取到的就是最新的数据。
  对于getObjectVolatile而言,可以看到它在返回前加了read_barrier,这个读屏障的作用就是强制去读取主存中的数据而不是线程自己的本地工作内存,这样就确保了读取到的一定是最新的数据。
  最后就是putOrderedObject,这个方法和putObjectVolatile的区别源码中在于没有加write_barrier,个人理解是这个方法只保证了更新数据的可见性,但是无法保证有序性,因为没有添加屏障可能会导致最终生成的汇编指令被重排序优化,不过在ConcurrentHashMap中使用到这个方法的地方主要是在put方法更新数据的时候用到了,而关于put是加锁了的,所以个人理解的是在依据加锁过的代码区域,用putOrderedObject比putObjectVolatile好在不需要添加屏障,因为只会有一个线程进行操作,从而允许进行指令优化重排序,从而性能会更好。

简介

 众所周知,复杂度为O(1)的数据结构,在Java开发中,我们最常见到最频繁使用的就是HashMap和HashTable,但是在线程竞争激烈的并发场景中使用都不够合理。

  HashMap :先说HashMap,HashMap是线程不安全的,在并发环境下,可能会形成环状链表(扩容时可能造成,具体原因自行百度google或查看源码分析),导致get操作时,cpu空转,所以,在并发环境中使用HashMap是非常危险的。

  HashTable : HashTable和HashMap的实现原理几乎一样,差别无非是1.HashTable不允许key和value为null;2.HashTable是线程安全的。但是HashTable线程安全的策略实现代价却太大了,简单粗暴,get/put所有相关操作都是synchronized的,这相当于给整个哈希表加了一把大锁,多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差。


HashTable性能差主要是由于所有操作需要竞争同一把锁

原理

容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。这就是ConcurrentHashMap所采用的”分段锁”思想。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。

重要属性

concurrencyLevel:并行级别。默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments,所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的

initialCapacity:初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。

loadFactor:负载因子,之前我们说了,Segment 数组不可以扩容,所以这个负载因子是给每个 Segment 内部使用的。

JDK1.7

ConcurrentHashMap的读取并发,因为在读取的大多数时候都没有用到锁定,所以读取操作几乎是完全的并发操作,而写操作锁定的粒度又非常细,比起之前又更加快速(这一点在桶更多时表现得更明显些)。只有在求size等操作时才需要锁定整个表。而在迭代时,ConcurrentHashMap使用了不同于传统集合的快速失败迭代器的另一种迭代方式,我们称为弱一致迭代器。在这种迭代方式中,当iterator被创建后集合再发生改变就不再是抛出 ConcurrentModificationException,取而代之的是在改变时new新的数据从而不影响原有的数据,iterator完成后再将头指针替换为新的数据,这样iterator线程可以使用原来老的数据,而写线程也可以并发的完成改变,更重要的,这保证了多个线程并发执行的连续性和扩展性,是性能提升的关键。

jdk1.7中采用Segment + HashEntry的方式进行实现,结构如下:

ConcurrentHashMap初始化时,计算出Segment数组的大小ssize和每个Segment中HashEntry数组的大小cap,并初始化Segment数组的第一个元素;其中ssize大小为2的幂次方,默认为16,cap大小也是2的幂次方,最小值为2,最终结果根据初始化容量initialCapacity进行计算。

源码分析

ConcurrentHashMap的主干是个Segment数组。

1
final Segment<K,V>[] segments;

Segment继承了ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap,一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,对于不同Segment的数据进行操作是不用考虑锁竞争的。(就按默认的ConcurrentLeve为16来讲,理论上就允许16个线程并发执行)

所以,对于同一个Segment的操作才需考虑线程同步,不同的Segment则无需考虑。

Segment类似于HashMap,一个Segment维护着一个HashEntry数组

1
transient volatile HashEntry<K,V>[] table;

HashEntry是目前我们提到的最小的逻辑处理单元了。一个ConcurrentHashMap维护一个Segment数组,一个Segment维护一个HashEntry数组。

1
2
3
4
5
6
7
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
//其他省略
}

我们说Segment类似哈希表,那么一些属性就跟我们之前提到的HashMap差不离,比如负载因子loadFactor,比如阈值threshold等等,看下Segment的构造方法。

1
2
3
4
5
6
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;//负载因子
this.threshold = threshold;//阈值
this.table = tab;//主干数组即HashEntry数组
}

我们来看下ConcurrentHashMap的构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//MAX_SEGMENTS 为1<<16=65536,也就是最大并发数为65536
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
//2的sshif次方等于ssize,例:ssize=16,sshift=4;ssize=32,sshif=5
int sshift = 0;
//ssize 为segments数组长度,根据concurrentLevel计算得出
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 用默认值,concurrencyLevel 为 16,sshift 为 4
// 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//计算cap的大小,即Segment中HashEntry的数组长度,cap也一定为2的n次方.
// // initialCapacity 是设置整个 map 初始的大小,
// 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小
// 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
//默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上,
// 插入一个元素不至于扩容,插入第二个的时候才会扩容
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
//创建segments数组并初始化第一个Segment,其余的Segment延迟初始化
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
// 往数组写入 segment[0]
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}

初始化方法有三个参数,如果用户不指定则会使用默认值,initialCapacity为16,loadFactor为0.75(负载因子,扩容时需要参考),concurrentLevel为16。

从上面的代码可以看出来,Segment数组的大小ssize是由concurrentLevel来决定的,但是却不一定等于concurrentLevel,ssize一定是大于或等于concurrentLevel的最小的2的次幂。比如:默认情况下concurrentLevel是16,则ssize为16;若concurrentLevel为14,ssize为16;若concurrentLevel为17,则ssize为32。为什么Segment的数组大小一定是2的次幂?其实主要是便于通过按位与的散列算法来定位Segment的index。

当我们用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:

  • Segment 数组长度为 16,不可以扩容
  • Segment[i] 的默认大小为 2,负载因子是 0.75,得出初始阈值为 1.5,也就是以后插入第一个元素不会触发扩容,插入第二个会进行第一次扩容
  • 这里初始化了 segment[0],其他位置还是 null,至于为什么要初始化 segment[0],后面的代码会介绍
  • 当前 segmentShift 的值为 32 – 4 = 28,segmentMask 为 16 – 1 = 15,姑且把它们简单翻译为移位数和掩码

segmentShift和segmentMask

segmentShift和segmentMask这两个全局变量的主要作用是用来定位Segment,int j =(hash >>> segmentShift) & segmentMask。

  segmentMask:段掩码,假如segments数组长度为16,则段掩码为16-1=15;segments长度为32,段掩码为32-1=31。这样得到的所有bit位都为1,可以更好地保证散列的均匀性

  segmentShift:2的sshift次方等于ssize,segmentShift=32-sshift。若segments长度为16,segmentShift=32-4=28;若segments长度为32,segmentShift=32-5=27。而计算得出的hash值最大为32位,无符号右移segmentShift,则意味着只保留高几位(其余位是没用的),然后与段掩码segmentMask位运算来定位Segment。

put

put 的主流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 1. 计算 key 的 hash 值
int hash = hash(key);
// 2. 根据 hash 值找到 Segment 数组中的位置 j
// hash 是 32 位,无符号右移 segmentShift(28) 位,剩下高 4 位,
// 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的前 4 位,也就是槽的数组下标
int j = (hash >>> segmentShift) & segmentMask;
// 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null,
// ensureSegment(j) 对 segment[j] 进行初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 3. 插入新值到 槽 s 中
return s.put(key, hash, value, false);
}

 从源码看出,put的主要逻辑也就两步:
1.定位segment并确保定位的Segment已初始化
2.调用Segment的put方法。

接下来定位到Segment上的put方法,Segment中的put方法是要加锁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//先尝试对segment加锁,如果直接加锁成功,那么node=null;如果加锁失败,则会调用scanAndLockForPut方法去获取锁,
//在这个方法中,获取锁后会返回对应HashEntry(要么原来就有要么新建一个)
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
//这里是一个优化点,由于table自身是被volatile修饰的,然而put这一块代码本身是加锁了的,所以同一时间内只会有一个线程操作这部分内容,
//所以不再需要对这一块内的变量做任何volatile修饰,因为变量加了volatile修饰后,变量无法进行编译优化等,会对性能有一定的影响
//故将table赋值给put方法中的一个局部变量,从而使得能够减少volatile带来的不必要消耗。
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
//这里有一个问题:为什么不直接使用数组下标获取HashEntry,而要用entryAt来获取链表?
//这里结合网上内容个人理解是:由于Segment继承的是ReentrantLock,所以它是一个可重入锁,那么是否存在某种场景下,
//会导致同一个线程连续两次进入put方法,而由于put最终使用的putOrderedObject只是禁止了写写重排序无法保证内存可见性,
//所以这种情况下第二次put在获取链表时必须用entryAt中的volatile语义的get来获取链表,因为这种情况下下标获取的不一定是最新数据。
//不过并没有想到哪里会存在这种场景,有谁能想到的或者是我的理解有误请指出!
HashEntry<K,V> first = entryAt(tab, index);//先获取需要put的<k,v>对在当前这个segment中对应的链表的表头结点。

for (HashEntry<K,V> e = first;;) {//开始遍历first为头结点的链表
if (e != null) {//<1>
//e不为空,说明当前键值对需要存储的位置有hash冲突,直接遍历当前链表,如果链表中找到一个节点对应的key相同,
//依据onlyIfAbsent来判断是否覆盖已有的value值。
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
//进入这个条件内说明需要put的<k,y>对应的key节点已经存在,直接判断是否更新并最后break退出循环。
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;//未进入上面的if条件中,说明当前e节点对应的key不是需要的,直接遍历下一个节点。
}
else {//<2>
//进入到这个else分支,说明e为空,对应有两种情况下e可能会为空,即:
// 1>. <1>中进行循环遍历,遍历到了链表的表尾仍然没有满足条件的节点。
// 2>. e=first一开始就是null(可以理解为即一开始就遍历到了尾节点)
if (node != null) //这里有可能获取到锁是通过scanAndLockForPut方法内自旋获取到的,这种情况下依据找好或者说是新建好了对应节点,node不为空
node.setNext(first);
else// 当然也有可能是这里直接第一次tryLock就获取到了锁,从而node没有分配对应节点,即需要给依据插入的k,v来创建一个新节点
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1; //总数+1 在这里依据获取到了锁,即是线程安全的!对应了上述对count变量的使用规范说明。
if (c > threshold && tab.length < MAXIMUM_CAPACITY)//判断是否需要进行扩容
//扩容是直接重新new一个新的HashEntry数组,这个数组的容量是老数组的两倍,
//新数组创建好后再依次将老的table中的HashEntry插入新数组中,所以这个过程是十分费时的,应尽量避免。
//扩容完毕后,还会将这个node插入到新的数组中。
rehash(node);
else
//数组无需扩容,那么就直接插入node到指定index位置,这个方法里用的是UNSAFE.putOrderedObject
//网上查阅到的资料关于使用这个方法的原因都是说因为它使用的是StoreStore屏障,而不是十分耗时的StoreLoad屏障
//给我个人感觉就是putObjectVolatile是对写入对象的写入赋予了volatile语义,但是代价是用了StoreLoad屏障
//而putOrderedObject则是使用了StoreStore屏障保证了写入顺序的禁止重排序,但是未实现volatile语义导致更新后的不可见性,
//当然这里由于是加锁了,所以在释放锁前会将所有变化从线程自身的工作内存更新到主存中。
//这一块对于putOrderedObject和putObjectVolatile的区别有点混乱,不是完全理解,网上也没找到详细解答,查看了C源码也是不大确定。
//希望有理解的人看到能指点一下,后续如果弄明白了再更新这一块。
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

注意:
1、setEntryAt()操作以实现对链头的延时写,以提升性能,因为此时并不需要将该更新写入到内存,而在锁退出后该更新自然会写入内存。
2、如果scanAndLockForPut()操作返回了一个非空HashEntry,则表示在scanAndLockForPut()遍历key对应节点链时没有找到相应的节点。此时很多时候需要创建新的节点,因而它预创建HashEntry节点,所以不需要再创建,只需要更新它的next指针即可,这里使用setNext()实现延时写也时为了提升性能,因为当前修改并不需要让其他线程知道,在锁退出时修改自然会更新到内存中,如果采用直接赋值给next字段,由于next时volatile字段,会引起更新直接写入内存而增加开销。

接下来,我们说一说其中几步关键的操作。
1、初始化槽: ensureSegment
ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。

这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 这里看到为什么之前要初始化 segment[0] 了,
// 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k]
// 为什么要用“当前”,因为 segment[0] 可能早就扩容过了
Segment<K,V> proto = ss[0];
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);

// 初始化 segment[k] 内部的数组
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // 再次检查一遍该槽是否被其他线程初始化了。

Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

总的来说,ensureSegment(int k) 比较简单,对于并发操作使用 CAS 进行控制。

2、获取写入锁: scanAndLockForPut

当put操作尝试加锁没成功时,它不是直接进入等待状态,而是调用了scanAndLockForPut()操作,该操作持续查找key对应的节点链中是已存在节点,如果没有找到已存在的节点,则预创建一个新节点,并且尝试n次,直到尝试次数操作限制,才真正进入等待状态,即所谓的自旋等待。对最大尝试次数,目前的实现单核次数为1,多核为64

下面我们来具体分析这个方法中是怎么控制加锁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node

//如果尝试加锁失败,那么就对segment[hash]对应的链表进行遍历找到需要put的这个entry所在的链表中的位置,
//这里之所以进行一次遍历找到坑位,主要是为了通过遍历过程将遍历过的entry全部放到CPU高速缓存中,
//这样在获取到锁了之后,再次进行定位的时候速度会十分快,这是在线程无法获取到锁前并等待的过程中的一种预热方式。
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
//e=null代表两种意思,第一种就是遍历链表到了最后,仍然没有发现指定key的entry;
//第二种情况是刚开始时确实通过entryForHash找到的HashEntry就是空的,即通过hash找到的table中对应位置链表为空
//当然这里之所以还需要对node==null进行判断,是因为有可能在第一次给node赋值完毕后,然后预热准备工作已经搞定,
//然后进行循环尝试获取锁,在循环次数还未达到<2>以前,某一次在条件<3>判断时发现有其它线程对这个segment进行了修改,
//那么retries被重置为-1,从而再一次进入到<1>条件内,此时如果再次遍历到链表最后时,因为上一次遍历时已经给node赋值过了,
//所以这里判断node是否为空,从而避免第二次创建对象给node重复赋值。
if (e == null) {
if (node == null) // speculatively create node
// 进到这里说明数组该位置的链表是空的,没有任何元素
// 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
// 顺着链表往下走
e = e.next;
}
// 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁
// lock() 是阻塞方法,直到获取锁后返回
else if (++retries > MAX_SCAN_RETRIES) {
// 尝试获取锁次数超过设置的最大值,直接进入阻塞等待,这就是所谓的有限制的自旋获取锁,
//之所以这样是因为如果持有锁的线程要过很久才释放锁,这期间如果一直无限制的自旋其实是对系统性能有消耗的,
//这样无限制的自旋是不利的,所以加入最大自旋次数,超过这个次数则进入阻塞状态等待对方释放锁并获取锁。
lock();
break;
}
else if ((retries & 1) == 0 &&
// 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头
// 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。

在这段逻辑中,它先获取key对应的节点链的头,然后持续遍历该链,如果节点链中不存在要插入的节点,则预创建一个节点,否则retries值自增,直到操作最大尝试次数而进入等待状态。这里需要注意最后一个else if中的逻辑:当在自旋过程中发现节点链的链头发生了变化,则更新节点链的链头,并重置retries值为-1,重新为尝试获取锁而自旋遍历。

扩容

segment 数组不能扩容,扩容是 segment 数组某个位置内部的数组 HashEntry[] 进行扩容,扩容后,容量为原来的 2 倍。
首先,我们要回顾一下触发扩容的地方,put 的时候,如果判断该值的插入会导致该 segment 的元素个数超过阈值,那么先进行扩容,再插值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* Doubles size of table and repacks entries, also adding the
* given node to new table
* 对数组进行扩容,由于扩容过程需要将老的链表中的节点适用到新数组中,所以为了优化效率,可以对已有链表进行遍历,
* 对于老的oldTable中的每个HashEntry,从头结点开始遍历,找到第一个后续所有节点在新table中index保持不变的节点fv,
* 假设这个节点新的index为newIndex,那么直接newTable[newIndex]=fv,即可以直接将这个节点以及它后续的链表中内容全部直接复用copy到newTable中
* 这样最好的情况是所有oldTable中对应头结点后跟随的节点在newTable中的新的index均和头结点一致,那么就不需要创建新节点,直接复用即可。
* 最坏情况当然就是所有节点的新的index全部发生了变化,那么就全部需要重新依据k,v创建新对象插入到newTable中。
*/
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 创建新数组
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’
    int sizeMask = newCapacity - 1;
 
    // 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置
    for (int i = 0; i < oldCapacity ; i++) {
        // e 是链表的第一个元素
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 计算应该放置在新数组中的位置,
            // 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask;
            if (next == null)   // 该位置处只有一个元素,那比较好办
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是链表表头
                HashEntry<K,V> lastRun = e;
                // idx 是当前链表的头结点 e 的新位置
                int lastIdx = idx;
 
                // 下面这个 for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 将 lastRun 及其之后的所有节点组成的这个链表放到 lastIdx 这个位置
                newTable[lastIdx] = lastRun;
                // 下面的操作是处理 lastRun 之前的节点,
                //    这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

它创建一个大原来两倍容量的数组,然后遍历原来数组以及数组项中的每条链,对每个节点重新计算它的数组索引,然后创建一个新的节点插入到新数组中,这里需要重新创建一个新节点而不是修改原有节点的next指针是为了在做rehash时可以保证其他线程的get遍历操作可以正常在原有的链上正常工作,有点copy-on-write思想。

然而Doug Lea继续优化了这段逻辑,为了减少重新创建新节点的开销,这里做了两点优化:1,对只有一个节点的链,直接将该节点赋值给新数组对应项即可(之所以能这么做是因为Segment中数组的长度也永远是2的倍数,而将数组长度扩大成原来的2倍,那么新节点在新数组中的位置只能是相同的索引号或者原来索引号加原来数组的长度,因而可以保证每条链在rehash是不会相互干扰);2,对有多个节点的链,先遍历该链找到第一个后面所有节点的索引值不变的节点p,然后只重新创建节点p以前的节点即可,此时新节点链和旧节点链同时存在,在p节点相遇,这样即使有其他线程在当前链做遍历也能正常工作。

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);//获取key对应hash值
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;//获取对应h值存储所在segments数组中内存偏移量
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//通过Unsafe中的getObjectVolatile方法进行volatile语义的读,获取到segments在偏移量为u位置的分段Segment,
//并且分段Segment中对应table数组不为空
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {//获取h对应这个分段中偏移量为xxx下的HashEntry的链表头结点,然后对链表进行 遍历
//###这里第一次初始化通过getObjectVolatile获取HashEntry时,获取到的是主存中最新的数据,但是在后续遍历过程中,有可能数据被其它线程修改
//从而导致其实这里最终返回的可能是过时的数据,所以这里就是ConcurrentHashMap所谓的弱一致性的体现,containsKey方法也一样!!!!
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

这个方法是弱一致性的,所以有可能会获取到过时的数据,如果业务场景要求获取数据的强一致性,不建议用这个。

get 的时候在同一个 segment 中发生了 put 或 remove 操作。

  • put 操作的线程安全性。
    初始化槽,这个我们之前就说过了,使用了 CAS 来初始化 Segment 中的数组。
    添加节点到链表的操作是插入到表头的,所以,如果这个时候 get 操作在链表遍历的过程已经到了中间,是不会影响的。

当然,另一个并发问题就是 get 操作在 put 之后,需要保证刚刚插入表头的节点被读取,这个依赖于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。

扩容。扩容是新创建了数组,然后进行迁移数据,最后面将 newTable 设置给属性 table。所以,如果 get 操作此时也在进行,那么也没关系,如果 get 先行,那么就是在旧的 table 上做查询操作;而 put 先行,那么 put 操作的可见性保证就是 table 使用了 volatile 关键字。

  • remove 操作的线程安全性。
    如果 remove 先破坏了一个节点,分两种情况考虑。 1、如果此节点是头结点,那么需要将头结点的 next 设置为数组该位置的元素,table 虽然使用了 volatile 修饰,但是 volatile 并不能提供数组内部操作的可见性保证,所以源码中使用了 UNSAFE 来操作数组,请看方法 setEntryAt。2、如果要删除的节点不是头结点,它会将要删除节点的后继节点接到前驱节点中,这里的并发保证就是 next 属性是 volatile 的。

remove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/** 
* Remove; match on key only if value null, else match both.
*/
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}

在JDK 1.6版本中,remove操作比较直观,它先找到key对应的节点链的链头(数组中的某个项),然后遍历该节点链,如果在节点链中找到key相等的节点,则为该节点之前的所有节点重新创建节点并组成一条新链,将该新链的链尾指向找到节点的下一个节点。这样如前面rehash提到的,同时有两条链存在,即使有另一个线程正在该链上遍历也不会出问题。

然而Doug Lea又挖掘到了新的优化点,在1.7中,他不再重新创建一条新的链,而是只在当起缓存中将链中找到的节点移除。当移除的是链头则更新数组项的值,否则更新找到节点的前一个节点的next指针。这也是HashEntry中next指针没有设置成final的原因。当然remove操作如果第一次尝试获得锁失败也会如put操作一样先进入自旋状态,这里的scanAndLock和scanAndLockForPut类似,只是它不做预创建节点的步骤。

size实现

因为ConcurrentHashMap是可以并发插入数据的,所以在准确计算元素时存在一定的难度,一般的思路是统计每个Segment对象中的元素个数,然后进行累加,但是这种方式计算出来的结果并不一样的准确的,因为在计算后面几个Segment的元素个数时,已经计算过的Segment同时可能有数据的插入或则删除,在1.7的实现中,采用了如下方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 默认自旋次数,超过这个次数直接加锁,防止在size方法中由于不停有线程在更新map
* 导致无限的进行自旋影响性能,当然这种会导致ConcurrentHashMap使用了这一规则的方法
* 如size、clear是弱一致性的。
*/
static final int RETRIES_BEFORE_LOCK = 2;

try {
//无限for循环,结束条件就是任意前后两次遍历过程中modcount值的和是一样的,说明第二次遍历没有做任何变化
//这里就是前面介绍的为了防止由于有线程不断在更新map而导致每次遍历过程一直发现modCount和上一次不一样
//从而导致线程一直进行遍历验证前后两次modCount,为了防止这种情况发生,加了一个最多重复的次数限制,
//超过这个次数则直接强制对所有的segment进行加锁,不过这里需要注意如果出现这种情况,会导致本来要延迟创建的所有segment
//均在这个过程中被创建
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
//由于只有在retries等于RETRIES_BEFORE_LOCK时才会执行强制加锁,并且由于是用的retries++,
//所以强制加锁完毕后,retries的值是一定会大于RETRIES_BEFORE_LOCK的,
//这样就防止正常遍历而没进行加锁时进行锁释放的情况
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}

先采用不加锁的方式,连续计算元素的个数,最多计算3次:
1、如果前后两次计算结果相同,则说明计算出来的元素个数是准确的;
2、如果前后两次计算结果都不同,则给每个Segment进行加锁,再计算一次元素的个数;

1.8

1.8中放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现,结构如下:

原理

 在ConcurrentHashMap中通过一个Node<K,V>[]数组来保存添加到map中的键值对,而在同一个数组位置是通过链表和红黑树的形式来保存的。但是这个数组只有在第一次添加元素的时候才会初始化,否则只是初始化一个ConcurrentHashMap对象的话,只是设定了一个sizeCtl变量,这个变量用来判断对象的一些状态和是否需要扩容。

第一次添加元素的时候,默认初期长度为16,当往map中继续添加元素的时候,通过hash值跟数组长度取与来决定放在数组的哪个位置,如果出现放在同一个位置的时候,优先以链表的形式存放,在同一个位置的个数又达到了8个以上,如果数组的长度还小于64的时候,则会扩容数组。如果数组的长度大于等于64了的话,在会将该节点的链表转换成树。

  通过扩容数组的方式来把这些节点给分散开。然后将这些元素复制到扩容后的新的数组中,同一个链表中的元素通过hash值的数组长度位来区分,是还是放在原来的位置还是放到扩容的长度的相同位置去 。在扩容完成之后,如果某个节点的是树,同时现在该节点的个数又小于等于6个了,则会将该树转为链表。

这个时候因为数组的长度才为16,则不会转化为树,而是会进行扩容。扩容后数组大概是这样的:

如果数组扩张后长度达到64了,且继续在某个节点的后面添加元素达到8个以上的时候,则会出现转化为红黑树的情况。转化之后大概是这样:

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
static final int MOVED = -1; // 表示正在转移
static final int TREEBIN = -2; // 表示已经转换成树
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
transient volatile Node<K,V>[] table;//默认没初始化的数组,用来保存元素
private transient volatile Node<K,V>[] nextTable;//转移的时候用的数组
/**
* 用来控制表初始化和扩容的,默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,大小为数组的0.75
* 当为负的时候,说明表正在初始化或扩张,
* -1表示初始化
* -(1+n) n:表示活动的扩张线程
*/
private transient volatile int sizeCtl;
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

通过提供初始容量,计算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方】。如 initialCapacity 为 10,那么得到 sizeCtl 为 16,如果 initialCapacity 为 11,得到 sizeCtl 为 32。
在任何一个构造方法中,都没有对存储Map元素Node的table变量进行初始化。而是在第一次put操作的时候在进行初始化。

put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
* 单纯的额调用putVal方法,并且putVal的第三个参数设置为false
* 当设置为false的时候表示这个value一定会设置
* true的时候,只有当这个key的value为空的时候才会设置
*/
public V put(K key, V value) {
return putVal(key, value, false);
}

/*
* 当添加一对键值对的时候,首先会去判断保存这些键值对的数组是不是初始化了,
* 如果没有的话就初始化数组
* 然后通过计算hash值来确定放在数组的哪个位置
* 如果这个位置为空则直接添加,如果不为空的话,则取出这个节点来
* 如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制
* 最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加操作
* 然后判断当前取出的节点位置存放的是链表还是树
* 如果是链表的话,则遍历整个链表,直到取出来的节点的key来个要放的key进行比较,如果key相等,并且key的hash值也相等的话,
* 则说明是同一个key,则覆盖掉value,否则的话则添加到链表的末尾
* 如果是树的话,则调用putTreeVal方法把这个元素添加到树中去
* 最后在添加完成之后,会判断在该节点处共有多少个节点(注意是添加前的个数),如果达到8个以上了的话,
* 则调用treeifyBin方法来尝试将处的链表转为树,或者扩容数组
*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();//K,V都不能为空,否则的话跑出异常
int hash = spread(key.hashCode()); //取得key的hash值
int binCount = 0; //用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树
for (Node<K,V>[] tab = table;;) { //
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //第一次put的时候table没有初始化,则初始化table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //通过哈希计算出一个表中的位置因为n是数组的长度,所以(n-1)&hash肯定不会出现数组越界
if (casTabAt(tab, i, null, //如果这个位置没有元素的话,则通过cas的方式尝试添加,注意这个时候是没有加锁的
new Node<K,V>(hash, key, value, null))) //创建一个Node添加到数组中区,null表示的是下一个节点为空
break; // no lock when adding to empty bin
}
/*
* 如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段,
* 则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失
*/
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
/*
* 如果在这个位置有元素的话,就采用synchronized的方式加锁,
* 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历,
* 如果找到了key和key的hash值都一样的节点,则把它的值替换到
* 如果没找到的话,则添加在链表的最后面
* 否则,是树的话,则调用putTreeVal方法添加到树中去
*
* 在添加完之后,会对该节点上关联的的数目进行判断,
* 如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容
*/
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) { //再次取出要存储的位置的元素,跟前面取出来的比较
if (fh >= 0) { //取出来的元素的hash值大于0,当转换为树之后,hash值为-2
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { //遍历这个链表
K ek;
if (e.hash == hash && //要存的元素的hash,key跟要存储的位置的节点的相同的时候,替换掉该节点的value即可
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent) //当使用putIfAbsent的时候,只有在这个key没有设置值得时候才设置
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //如果不是同样的hash,同样的key的时候,则判断该节点的下一个节点是否为空,
pred.next = new Node<K,V>(hash, key, //为空的话把这个要加入的节点设置为当前节点的下一个节点
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { //表示已经转化成红黑树类型了
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, //调用putTreeVal方法,将该元素添加到树中去
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) //当在同一个节点的数目达到8个的时候,则扩张数组或将给节点的数据转为tree
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); //计数
return null;
}

只有在执行第一次put方法时才会调用initTable()初始化Node数组,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 初始化数组table,
* 如果sizeCtl小于0,说明别的数组正在进行初始化,则让出执行权
* 如果sizeCtl大于0的话,则初始化一个大小为sizeCtl的数组
* 否则的话初始化一个默认大小(16)的数组
* 然后设置sizeCtl的值为数组长度的3/4
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { //第一次put的时候,table还没被初始化,进入while
if ((sc = sizeCtl) < 0) //sizeCtl初始值为0,当小于0的时候表示在别的线程在初始化表或扩展表
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //SIZECTL:表示当前对象的内存偏移量,sc表示期望值,-1表示要替换的值,设定为-1表示要初始化表了
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //指定了大小的时候就创建指定大小的Node数组,否则创建指定大小(16)的Node数组
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc; //初始化后,sizeCtl长度为数组长度的3/4
}
break;
}
}
return tab;
}

扩容

我们可以看到,在同一个节点的个数超过8个的时候,会调用treeifyBin方法来看看是扩容还是转化为一棵树,同时在每次添加完元素的addCount方法中,也会判断当前数组中的元素是否达到了sizeCtl的量,如果达到了的话,则会进入transfer方法去扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Replaces all linked nodes in bin at given index unless table is
* too small, in which case resizes instead.
* 当数组长度小于64的时候,扩张数组长度一倍,否则的话把链表转为树
*/
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
System.out.println("treeifyBin方\t==>数组长:"+tab.length);
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //MIN_TREEIFY_CAPACITY 64
tryPresize(n << 1); // 数组扩容
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) { //使用synchronized同步器,将该节点出的链表转为树
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null; //hd:树的头(head)
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null) //把Node组成的链表,转化为TreeNode的链表,头结点任然放在相同的位置
hd = p; //设置head
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));//把TreeNode的链表放入容器TreeBin中
}
}
}
}
}

用链表头部的TreeNode来构造一个TreeBin,在TreeBin容器中,将链表转化为红黑树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null); //创建的TreeBin是一个空节点,hash值为TREEBIN(-2)
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}//
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {//x代表的是转换为树之前的顺序遍历到链表的位置的节点,r代表的是根节点
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h) //
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk); //当key不可以比较,或者相等的时候采取的一种排序措施
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {//在这里判断要放的left/right是否为空,不为空继续用left/right节点来判断
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x); //每次插入一个元素的时候都调用balanceInsertion来保持红黑树的平衡
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}

可以看到当需要扩容的时候,调用的时候tryPresize方法,看看trePresize的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/**
* 扩容表为指可以容纳指定个数的大小(总是2的N次方)
* 假设原来的数组长度为16,则在调用tryPresize的时候,size参数的值为16<<1(32),此时sizeCtl的值为12
* 计算出来c的值为64,则要扩容到sizeCtl≥为止
* 第一次扩容之后 数组长:32 sizeCtl:24
* 第二次扩容之后 数组长:64 sizeCtl:48
* 第二次扩容之后 数组长:128 sizeCtl:94 --> 这个时候才会退出扩容
*/
private final void tryPresize(int size) {
/*
* MAXIMUM_CAPACITY = 1 << 30
* 如果给定的大小大于等于数组容量的一半,则直接使用最大容量,
* 否则使用tableSizeFor算出来
* 后面table一直要扩容到这个值小于等于sizeCtrl(数组长度的3/4)才退出扩容
*/
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// printTable(tab); 调试用的
/*
* 如果数组table还没有被初始化,则初始化一个大小为sizeCtrl和刚刚算出来的c中较大的一个大小的数组
* 初始化的时候,设置sizeCtrl为-1,初始化完成之后把sizeCtrl设置为数组长度的3/4
* 为什么要在扩张的地方来初始化数组呢?这是因为如果第一次put的时候不是put单个元素,
* 而是调用putAll方法直接put一个map的话,在putALl方法中没有调用initTable方法去初始化table,
* 而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断
*/
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //初始化tab的时候,把sizeCtl设为-1
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
/*
* 一直扩容到的c小于等于sizeCtl或者数组长度大于最大长度的时候,则退出
* 所以在一次扩容之后,不是原来长度的两倍,而是2的n次方倍
*/
else if (c <= sc || n >= MAXIMUM_CAPACITY) {
break; //退出扩张
}
else if (tab == table) {
int rs = resizeStamp(n);
/*
* 如果正在扩容Table的话,则帮助扩容
* 否则的话,开始新的扩容
* 在transfer操作,将第一个参数的table中的元素,移动到第二个元素的table中去,
* 虽然此时第二个参数设置的是null,但是,在transfer方法中,当第二个参数为null的时候,
* 会创建一个两倍大小的table
*/
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
/*
* transfer的线程数加一,该线程将进行transfer的帮忙
* 在transfer的时候,sc表示在transfer工作的线程数
*/
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
/*
* 没有在初始化或扩容,则开始扩容
*/
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2)) {
transfer(tab, null);
}
}
}
}

在tryPresize方法中,并没有加锁,允许多个线程进入,如果数组正在扩张,则当前线程也去帮助扩容。数组扩容的主要方法就是transfer方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
* 把数组中的节点复制到新的数组的相同位置,或者移动到扩张部分的相同位置
* 在这里首先会计算一个步长,表示一个线程处理的数组长度,用来控制对CPU的使用,
* 每个CPU最少处理16个长度的数组元素,也就是说,如果一个数组的长度只有16,那只有一个线程会对其进行扩容的复制移动操作
* 扩容的时候会一直遍历,知道复制完所有节点,没处理一个节点的时候会在链表的头部设置一个fwd节点,这样其他线程就会跳过他,
* 复制后在新数组中的链表不是绝对的反序的
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) //MIN_TRANSFER_STRIDE 用来控制不要占用太多CPU
stride = MIN_TRANSFER_STRIDE; // subdivide range //MIN_TRANSFER_STRIDE=16
/*
* 如果复制的目标nextTab为null的话,则初始化一个table两倍长的nextTab
* 此时nextTable被设置值了(在初始情况下是为null的)
* 因为如果有一个线程开始了表的扩张的时候,其他线程也会进来帮忙扩张,
* 而只是第一个开始扩张的线程需要初始化下目标数组
*/
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
/*
* 创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点
* 这是一个空的标志节点
*/
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; //是否继续向前查找的标志位
boolean finishing = false; // to ensure sweep(清扫) before committing nextTab,在完成之前重新在扫描一遍数组,看看有没完成的没
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing) {
advance = false;
}
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { //已经完成转移
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); //设置sizeCtl为扩容后的0.75
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
return;
}
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null) //数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1])
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) { //加锁操作
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) { //该节点的hash值大于等于0,说明是一个Node节点
/*
* 因为n的值为数组的长度,且是power(2,x)的,所以,在&操作的结果只可能是0或者n
* 根据这个规则
* 0--> 放在新表的相同位置
* n--> 放在新表的(n+原来位置)
*/
int runBit = fh & n;
Node<K,V> lastRun = f;
/*
* lastRun 表示的是需要复制的最后一个节点
* 每当新节点的hash&n -> b 发生变化的时候,就把runBit设置为这个结果b
* 这样for循环之后,runBit的值就是最后不变的hash&n的值
* 而lastRun的值就是最后一次导致hash&n 发生变化的节点(假设为p节点)
* 为什么要这么做呢?因为p节点后面的节点的hash&n 值跟p节点是一样的,
* 所以在复制到新的table的时候,它肯定还是跟p节点在同一个位置
* 在复制完p节点之后,p节点的next节点还是指向它原来的节点,就不需要进行复制了,自己就被带过去了
* 这也就导致了一个问题就是复制后的链表的顺序并不一定是原来的倒序
*/
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n; //n的值为扩张前的数组的长度
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
/*
* 构造两个链表,顺序大部分和原来是反的
* 分别放到原来的位置和新增加的长度的相同位置(i/n+i)
*/
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
/*
* 假设runBit的值为0,
* 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash计算后为0的节点下一个节点
* 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点
*/
ln = new Node<K,V>(ph, pk, pv, ln);
else
/*
* 假设runBit的值不为0,
* 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的table的第一个hash计算后不为0的节点下一个节点
* 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点
*/
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { //否则的话是一个树节点
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
/*
* 在复制完树节点之后,判断该节点处构成的树还有几个节点,
* 如果≤6个的话,就转回为一个链表
*/
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

下面的两点一定要注意:

    ·复制之后的新链表不是旧链表的绝对倒序。

    ·在扩容的时候每个线程都有处理的步长,最少为16,在这个步长范围内的数组节点只有自己一个线程来处理

size实现

1.8中使用一个volatile类型的变量baseCount记录元素的个数,当插入新数据或则删除数据时,会通过addCount()方法更新baseCount,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}

1、初始化时counterCells为空,在并发量很高时,如果存在两个线程同时执行CAS修改baseCount值,则失败的线程会继续执行方法体中的逻辑,使用CounterCell记录元素个数的变化;

2、如果CounterCell数组counterCells为空,调用fullAddCount()方法进行初始化,并插入对应的记录数,通过CAS设置cellsBusy字段,只有设置成功的线程才能初始化CounterCell数组,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}

3、如果通过CAS设置cellsBusy字段失败的话,则继续尝试通过CAS修改baseCount字段,如果修改baseCount字段成功的话,就退出循环,否则继续循环插入CounterCell对象;

1
2
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;

所以在1.8中的size实现比1.7简单多,因为元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

通过累加baseCount和CounterCell数组中的数量,即可得到元素的总个数;

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/*
* 相比put方法,get就很单纯了,支持并发操作,
* 当key为null的时候回抛出NullPointerException的异常
* get操作通过首先计算key的hash值来确定该元素放在数组的哪个位置
* 然后遍历该位置的所有节点
* 如果不存在的话返回null
*/
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 判断头结点是否就是我们需要的节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果头结点的 hash 小于 0,说明 正在扩容,或者该位置是红黑树
else if (eh < 0)
// 参考 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k)
return (p = e.find(h, key)) != null ? p.val : null;

// 遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

同步机制

ConcurrentHashMap是如果来做到并发安全,又是如何做到高效的并发的呢?

首先是读操作,从源码中可以看出来,在get操作中,根本没有使用同步机制,也没有使用unsafe方法,所以读操作是支持并发操作的。

1、那么写操作呢?

  分析这个之前,先看看什么情况下会引起数组的扩容,扩容是通过transfer方法来进行的。而调用transfer方法的只有trePresize、helpTransfer和addCount三个方法。

这三个方法又是分别在什么情况下进行调用的呢?

  • tryPresize是在treeIfybin和putAll方法中调用,treeIfybin主要是在put添加元素完之后,判断该数组节点相关元素是不是已经超过8个的时候,如果超过则会调用这个方法来扩容数组或者把链表转为树。

  • helpTransfer是在当一个线程要对table中元素进行操作的时候,如果检测到节点的HASH值为MOVED的时候,就会调用helpTransfer方法,在helpTransfer中再调用transfer方法来帮助完成数组的扩容

  • addCount是在当对数组进行操作,使得数组中存储的元素个数发生了变化的时候会调用的方法。

所以引起数组扩容的情况如下:

  • 只有在往map中添加元素的时候,在某一个节点的数目已经超过了8个,同时数组的长度又小于64的时候,才会触发数组的扩容。

  • 当数组中元素达到了sizeCtl的数量的时候,则会调用transfer方法来进行扩容

  
那么在扩容的时候,可以不可以对数组进行读写操作呢?

事实上是可以的。当在进行数组扩容的时候,如果当前节点还没有被处理(也就是说还没有设置为fwd节点),那就可以进行设置操作。
如果该节点已经被处理了,则当前线程也会加入到扩容的操作中去。

那么,多个线程又是如何同步处理的呢?

在ConcurrentHashMap中,同步处理主要是通过Synchronized和unsafe两种方式来完成的。

  • 在取得sizeCtl、某个位置的Node的时候,使用的都是unsafe的方法,来达到并发安全的目的

  • 当需要在某个位置设置节点的时候,则会通过Synchronized的同步机制来锁定该位置的节点。

  • 在数组扩容的时候,则通过处理的步长和fwd节点来达到并发安全的目的,通过设置hash值为MOVED

  • 当把某个位置的节点复制到扩张后的table的时候,也通过Synchronized的同步机制来保证现程安全

思考

在put中获取链表的头节点时,为什么不直接用tab[i]?

1
2
3
4
5
6
 // 这个是 segment 内部的数组
HashEntry<K,V>[] tab = table;
// 再利用 hash 值,求应该放置的数组下标
int index = (tab.length - 1) & hash;
// first 是数组该位置处的链表的表头
HashEntry<K,V> first = entryAt(tab, index);

将volatile的table字段引用赋值给tab局部变量,为了保证每次读取的table中的数组项都是最新的值,因而调用entryAt()方法获取数组项的值而不是通过tab[index]方式直接获取(在put操作更新节点链时,它采用Unsafe.putOrderedObject()操作,此时它对链头的更新只局限与当前线程,为了保证接下来的put操作能够读取到上一次的更新结果,需要使用volatile的语法去读取节点链的链头)。
entryAt方法:

1
2
3
4
5
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}

setEntryAt方法:

1
2
3
4
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
HashEntry<K,V> e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}

1.8synchronized用在哪些地方

1、put的时候,如果在table[i]上有元素的话,就采用synchronized的方式加锁
2、在transfer扩容的时候,使用synchronized重复对table[i]上的第一个元素加锁
3、链表转树的过程也是一样

参考资料