Unsafe类和内存屏障简介
不论是在jdk1.7还是jdk1.8版本中ConcurrentHashMap中使用的最为核心也是最为频繁的就是Unsafe类中的各种native本地方法。所以这里有必要先介绍一下其中用的最多的几个Unsafe类中的核心方法。主要的几个方法是Unsafe.putObjectVolatile(obj,long,obj2)、 Unsafe.getObjectVolatile、 Unsafe.putOrderedObject等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void sun::misc::Unsafe::putObjectVolatile (jobject obj, jlong offset, jobject value) { write_barrier (); volatile jobject *addr = (jobject *) ((char *) obj + offset); *addr = value; } void sun::misc::Unsafe::putObject (jobject obj, jlong offset, jobject value) { jobject *addr = (jobject *) ((char *) obj + offset); *addr = value; } jobject sun::misc::Unsafe::getObjectVolatile (jobject obj, jlong offset) { volatile jobject *addr = (jobject *) ((char *) obj + offset); jobject result = *addr; read_barrier (); return result; } void sun::misc::Unsafe::putOrderedObject (jobject obj, jlong offset, jobject value) { volatile jobject *addr = (jobject *) ((char *) obj + offset); *addr = value; }
在上述Unsafe几个方法的源代码中,可以看到有write_barrier和read_barrier这两个内存屏障,这两个就是对应的硬件中的写屏障和读屏障,java内存模型中使用的所谓的LoadLoad、LoadStore、StoreStore、StoreLoad这几个屏障就是基于这两个屏障实现的。写屏障的作用就是禁止了指令的重排序,并且配合C语言中的volatile关键字(C中的volatile关键字只能保证可见性不能保证有序性),个人理解就是通过添加内存屏障+C中的Volatile实现了类似Java中的Volatile关键字语义,即在putObjectVolatile方法中通过内存屏障保证了有序性,再通过volatile保证将对指定地址的操作是马上写入到共享的主存中而不是线程自身的本地工作内存中,这样配合下面的getObjectVolatile方法,就可以确保每次读取到的就是最新的数据。 对于getObjectVolatile而言,可以看到它在返回前加了read_barrier,这个读屏障的作用就是强制去读取主存中的数据而不是线程自己的本地工作内存,这样就确保了读取到的一定是最新的数据。 最后就是putOrderedObject,这个方法和putObjectVolatile的区别源码中在于没有加write_barrier,个人理解是这个方法只保证了更新数据的可见性,但是无法保证有序性,因为没有添加屏障可能会导致最终生成的汇编指令被重排序优化,不过在ConcurrentHashMap中使用到这个方法的地方主要是在put方法更新数据的时候用到了,而关于put是加锁了的,所以个人理解的是在依据加锁过的代码区域,用putOrderedObject比putObjectVolatile好在不需要添加屏障,因为只会有一个线程进行操作,从而允许进行指令优化重排序,从而性能会更好。
简介
众所周知,复杂度为O(1)的数据结构,在Java开发中,我们最常见到最频繁使用的就是HashMap和HashTable,但是在线程竞争激烈的并发场景中使用都不够合理。
HashMap :先说HashMap,HashMap是线程不安全的,在并发环境下,可能会形成环状链表(扩容时可能造成,具体原因自行百度google或查看源码分析),导致get操作时,cpu空转,所以,在并发环境中使用HashMap是非常危险的。
HashTable : HashTable和HashMap的实现原理几乎一样,差别无非是1.HashTable不允许key和value为null;2.HashTable是线程安全的。但是HashTable线程安全的策略实现代价却太大了,简单粗暴,get/put所有相关操作都是synchronized的,这相当于给整个哈希表加了一把大锁,多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差。
HashTable性能差主要是由于所有操作需要竞争同一把锁
原理
容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。这就是ConcurrentHashMap所采用的”分段锁”思想。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。
重要属性
concurrencyLevel:并行级别。默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments,所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的 。
initialCapacity:初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。
loadFactor:负载因子,之前我们说了,Segment 数组不可以扩容,所以这个负载因子是给每个 Segment 内部使用的。
JDK1.7
ConcurrentHashMap的读取并发,因为在读取的大多数时候都没有用到锁定,所以读取操作几乎是完全的并发操作,而写操作锁定的粒度又非常细,比起之前又更加快速(这一点在桶更多时表现得更明显些)。只有在求size等操作时才需要锁定整个表。而在迭代时,ConcurrentHashMap使用了不同于传统集合的快速失败迭代器的另一种迭代方式,我们称为弱一致迭代器。在这种迭代方式中,当iterator被创建后集合再发生改变就不再是抛出 ConcurrentModificationException,取而代之的是在改变时new新的数据从而不影响原有的数据,iterator完成后再将头指针替换为新的数据,这样iterator线程可以使用原来老的数据,而写线程也可以并发的完成改变,更重要的,这保证了多个线程并发执行的连续性和扩展性,是性能提升的关键。
jdk1.7中采用Segment + HashEntry的方式进行实现,结构如下:
ConcurrentHashMap初始化时,计算出Segment数组的大小ssize和每个Segment中HashEntry数组的大小cap,并初始化Segment数组的第一个元素;其中ssize大小为2的幂次方,默认为16,cap大小也是2的幂次方,最小值为2,最终结果根据初始化容量initialCapacity进行计算。
源码分析
ConcurrentHashMap的主干是个Segment数组。
1 final Segment<K,V>[] segments;
Segment继承了ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap,一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,对于不同Segment的数据进行操作是不用考虑锁竞争的。(就按默认的ConcurrentLeve为16来讲,理论上就允许16个线程并发执行)
所以,对于同一个Segment的操作才需考虑线程同步,不同的Segment则无需考虑。
Segment类似于HashMap,一个Segment维护着一个HashEntry数组
1 transient volatile HashEntry<K,V>[] table;
HashEntry是目前我们提到的最小的逻辑处理单元了。一个ConcurrentHashMap维护一个Segment数组,一个Segment维护一个HashEntry数组。
1 2 3 4 5 6 7 static final class HashEntry <K ,V > { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; }
我们说Segment类似哈希表,那么一些属性就跟我们之前提到的HashMap差不离,比如负载因子loadFactor,比如阈值threshold等等,看下Segment的构造方法。
1 2 3 4 5 6 Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this .loadFactor = lf; this .threshold = threshold; this .table = tab; }
我们来看下ConcurrentHashMap的构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0 ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0 ; int ssize = 1 ; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1 ; } this .segmentShift = 32 - sshift; this .segmentMask = ssize - 1 ; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1 ; Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int )(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); this .segments = ss; }
初始化方法有三个参数,如果用户不指定则会使用默认值,initialCapacity为16,loadFactor为0.75(负载因子,扩容时需要参考),concurrentLevel为16。
从上面的代码可以看出来,Segment数组的大小ssize是由concurrentLevel来决定的,但是却不一定等于concurrentLevel,ssize一定是大于或等于concurrentLevel的最小的2的次幂 。比如:默认情况下concurrentLevel是16,则ssize为16;若concurrentLevel为14,ssize为16;若concurrentLevel为17,则ssize为32。为什么Segment的数组大小一定是2的次幂?其实主要是便于通过按位与的散列算法来定位Segment的index。
当我们用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:
Segment 数组长度为 16,不可以扩容
Segment[i] 的默认大小为 2,负载因子是 0.75,得出初始阈值为 1.5,也就是以后插入第一个元素不会触发扩容,插入第二个会进行第一次扩容
这里初始化了 segment[0],其他位置还是 null,至于为什么要初始化 segment[0],后面的代码会介绍
当前 segmentShift 的值为 32 – 4 = 28,segmentMask 为 16 – 1 = 15,姑且把它们简单翻译为移位数和掩码
segmentShift和segmentMask
segmentShift和segmentMask这两个全局变量的主要作用是用来定位Segment,int j =(hash >>> segmentShift) & segmentMask。
segmentMask:段掩码,假如segments数组长度为16,则段掩码为16-1=15;segments长度为32,段掩码为32-1=31。这样得到的所有bit位都为1,可以更好地保证散列的均匀性
segmentShift:2的sshift次方等于ssize,segmentShift=32-sshift。若segments长度为16,segmentShift=32-4=28;若segments长度为32,segmentShift=32-5=27。而计算得出的hash值最大为32位,无符号右移segmentShift,则意味着只保留高几位(其余位是没用的),然后与段掩码segmentMask位运算来定位Segment。
put
put 的主流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public V put (K key, V value) { Segment<K,V> s; if (value == null ) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null ) s = ensureSegment(j); return s.put(key, hash, value, false ); }
从源码看出,put的主要逻辑也就两步: 1.定位segment并确保定位的Segment已初始化 2.调用Segment的put方法。
接下来定位到Segment上的put方法,Segment中的put方法是要加锁的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 final V put (K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1 ) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null ) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break ; } e = e.next; } else { if (node != null ) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1 ; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null ; break ; } } } finally { unlock(); } return oldValue; }
注意: 1、setEntryAt()操作以实现对链头的延时写,以提升性能,因为此时并不需要将该更新写入到内存,而在锁退出后该更新自然会写入内存。 2、如果scanAndLockForPut()操作返回了一个非空HashEntry,则表示在scanAndLockForPut()遍历key对应节点链时没有找到相应的节点。此时很多时候需要创建新的节点,因而它预创建HashEntry节点,所以不需要再创建,只需要更新它的next指针即可,这里使用setNext()实现延时写也时为了提升性能,因为当前修改并不需要让其他线程知道,在锁退出时修改自然会更新到内存中,如果采用直接赋值给next字段,由于next时volatile字段,会引起更新直接写入内存而增加开销。
接下来,我们说一说其中几步关键的操作。 1、初始化槽: ensureSegment ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。
这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private Segment<K,V> ensureSegment (int k) { final Segment<K,V>[] ss = this .segments; long u = (k << SSHIFT) + SBASE; Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { Segment<K,V> proto = ss[0 ]; int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int )(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { if (UNSAFE.compareAndSwapObject(ss, u, null , seg = s)) break ; } } } return seg; }
总的来说,ensureSegment(int k) 比较简单,对于并发操作使用 CAS 进行控制。
2、获取写入锁: scanAndLockForPut
当put操作尝试加锁没成功时,它不是直接进入等待状态,而是调用了scanAndLockForPut()操作,该操作持续查找key对应的节点链中是已存在节点,如果没有找到已存在的节点,则预创建一个新节点 ,并且尝试n次,直到尝试次数操作限制,才真正进入等待状态,即所谓的自旋等待。对最大尝试次数,目前的实现单核次数为1,多核为64 。
下面我们来具体分析这个方法中是怎么控制加锁的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 private HashEntry<K,V> scanAndLockForPut (K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this , hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null ; int retries = -1 ; while (!tryLock()) { HashEntry<K,V> f; if (retries < 0 ) { if (e == null ) { if (node == null ) node = new HashEntry<K,V>(hash, key, value, null ); retries = 0 ; } else if (key.equals(e.key)) retries = 0 ; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break ; } else if ((retries & 1 ) == 0 && (f = entryForHash(this , hash)) != first) { e = first = f; retries = -1 ; } } return node; }
这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。
在这段逻辑中,它先获取key对应的节点链的头,然后持续遍历该链,如果节点链中不存在要插入的节点,则预创建一个节点,否则retries值自增,直到操作最大尝试次数而进入等待状态。这里需要注意最后一个else if中的逻辑:当在自旋过程中发现节点链的链头发生了变化,则更新节点链的链头,并重置retries值为-1,重新为尝试获取锁而自旋遍历。
扩容
segment 数组不能扩容 ,扩容是 segment 数组某个位置内部的数组 HashEntry[] 进行扩容,扩容后,容量为原来的 2 倍。 首先,我们要回顾一下触发扩容的地方,put 的时候,如果判断该值的插入会导致该 segment 的元素个数超过阈值,那么先进行扩容,再插值 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 private void rehash (HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1 ; threshold = (int )(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1 ; for (int i = 0 ; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null ) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null ) newTable[idx] = e; else { HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null ; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
它创建一个大原来两倍容量的数组,然后遍历原来数组以及数组项中的每条链,对每个节点重新计算它的数组索引,然后创建一个新的节点插入到新数组中,这里需要重新创建一个新节点而不是修改原有节点的next指针是为了在做rehash时可以保证其他线程的get遍历操作可以正常在原有的链上正常工作,有点copy-on-write思想。
然而Doug Lea继续优化了这段逻辑,为了减少重新创建新节点的开销,这里做了两点优化:1,对只有一个节点的链,直接将该节点赋值给新数组对应项即可(之所以能这么做是因为Segment中数组的长度也永远是2的倍数,而将数组长度扩大成原来的2倍,那么新节点在新数组中的位置只能是相同的索引号或者原来索引号加原来数组的长度,因而可以保证每条链在rehash是不会相互干扰);2,对有多个节点的链,先遍历该链找到第一个后面所有节点的索引值不变的节点p,然后只重新创建节点p以前的节点即可,此时新节点链和旧节点链同时存在,在p节点相遇,这样即使有其他线程在当前链做遍历也能正常工作。
get
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public V get (Object key) { Segment<K,V> s; HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null ) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long )(((tab.length - 1 ) & h)) << TSHIFT) + TBASE); e != null ; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null ; }
这个方法是弱一致性的,所以有可能会获取到过时的数据,如果业务场景要求获取数据的强一致性,不建议用这个。
get 的时候在同一个 segment 中发生了 put 或 remove 操作。
put 操作的线程安全性。 初始化槽,这个我们之前就说过了,使用了 CAS 来初始化 Segment 中的数组。 添加节点到链表的操作是插入到表头的,所以,如果这个时候 get 操作在链表遍历的过程已经到了中间,是不会影响的。
当然,另一个并发问题就是 get 操作在 put 之后,需要保证刚刚插入表头的节点被读取,这个依赖于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
扩容。扩容是新创建了数组,然后进行迁移数据,最后面将 newTable 设置给属性 table。所以,如果 get 操作此时也在进行,那么也没关系,如果 get 先行,那么就是在旧的 table 上做查询操作;而 put 先行,那么 put 操作的可见性保证就是 table 使用了 volatile 关键字。
remove 操作的线程安全性。 如果 remove 先破坏了一个节点,分两种情况考虑。 1、如果此节点是头结点,那么需要将头结点的 next 设置为数组该位置的元素,table 虽然使用了 volatile 修饰,但是 volatile 并不能提供数组内部操作的可见性保证,所以源码中使用了 UNSAFE 来操作数组,请看方法 setEntryAt。2、如果要删除的节点不是头结点,它会将要删除节点的后继节点接到前驱节点中,这里的并发保证就是 next 属性是 volatile 的。
remove
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 final V remove (Object key, int hash, Object value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null ; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1 ) & hash; HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null ; while (e != null ) { K k; HashEntry<K,V> next = e.next; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { if (pred == null ) setEntryAt(tab, index, next); else pred.setNext(next); ++modCount; --count; oldValue = v; } break ; } pred = e; e = next; } } finally { unlock(); } return oldValue; }
在JDK 1.6版本中,remove操作比较直观,它先找到key对应的节点链的链头(数组中的某个项),然后遍历该节点链,如果在节点链中找到key相等的节点,则为该节点之前的所有节点重新创建节点并组成一条新链,将该新链的链尾指向找到节点的下一个节点。这样如前面rehash提到的,同时有两条链存在,即使有另一个线程正在该链上遍历也不会出问题。
然而Doug Lea又挖掘到了新的优化点,在1.7中,他不再重新创建一条新的链,而是只在当起缓存中将链中找到的节点移除。当移除的是链头则更新数组项的值,否则更新找到节点的前一个节点的next指针。这也是HashEntry中next指针没有设置成final的原因。当然remove操作如果第一次尝试获得锁失败也会如put操作一样先进入自旋状态,这里的scanAndLock和scanAndLockForPut类似,只是它不做预创建节点的步骤。
size实现
因为ConcurrentHashMap是可以并发插入数据的,所以在准确计算元素时存在一定的难度,一般的思路是统计每个Segment对象中的元素个数,然后进行累加,但是这种方式计算出来的结果并不一样的准确的,因为在计算后面几个Segment的元素个数时,已经计算过的Segment同时可能有数据的插入或则删除,在1.7的实现中,采用了如下方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 static final int RETRIES_BEFORE_LOCK = 2 ;try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0 ; j < segments.length; ++j) ensureSegment(j).lock(); } sum = 0L ; size = 0 ; overflow = false ; for (int j = 0 ; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null ) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0 ) overflow = true ; } } if (sum == last) break ; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0 ; j < segments.length; ++j) segmentAt(segments, j).unlock(); } }
先采用不加锁的方式,连续计算元素的个数,最多计算3次: 1、如果前后两次计算结果相同,则说明计算出来的元素个数是准确的; 2、如果前后两次计算结果都不同,则给每个Segment进行加锁,再计算一次元素的个数;
1.8
1.8中放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现,结构如下:
原理
在ConcurrentHashMap中通过一个Node<K,V>[]数组来保存添加到map中的键值对,而在同一个数组位置是通过链表和红黑树的形式来保存的。但是这个数组只有在第一次添加元素的时候才会初始化,否则只是初始化一个ConcurrentHashMap对象的话,只是设定了一个sizeCtl变量,这个变量用来判断对象的一些状态和是否需要扩容。
第一次添加元素的时候,默认初期长度为16,当往map中继续添加元素的时候,通过hash值跟数组长度取与来决定放在数组的哪个位置,如果出现放在同一个位置的时候,优先以链表的形式存放,在同一个位置的个数又达到了8个以上,如果数组的长度还小于64的时候,则会扩容数组。如果数组的长度大于等于64了的话,在会将该节点的链表转换成树。
通过扩容数组的方式来把这些节点给分散开。然后将这些元素复制到扩容后的新的数组中,同一个链表中的元素通过hash值的数组长度位来区分,是还是放在原来的位置还是放到扩容的长度的相同位置去 。在扩容完成之后,如果某个节点的是树,同时现在该节点的个数又小于等于6个了,则会将该树转为链表。
这个时候因为数组的长度才为16,则不会转化为树,而是会进行扩容。扩容后数组大概是这样的:
如果数组扩张后长度达到64了,且继续在某个节点的后面添加元素达到8个以上的时候,则会出现转化为红黑树的情况。转化之后大概是这样:
初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private static final int MAXIMUM_CAPACITY = 1 << 30 ;private static final int DEFAULT_CAPACITY = 16 ;static final int TREEIFY_THRESHOLD = 8 ;static final int UNTREEIFY_THRESHOLD = 6 ;static final int MIN_TREEIFY_CAPACITY = 64 ;static final int MOVED = -1 ; static final int TREEBIN = -2 ; static final int RESERVED = -3 ; static final int HASH_BITS = 0x7fffffff ; transient volatile Node<K,V>[] table;private transient volatile Node<K,V>[] nextTable; private transient volatile int sizeCtl; public ConcurrentHashMap (int initialCapacity) { if (initialCapacity < 0 ) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1 )) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1 ) + 1 )); this .sizeCtl = cap; }
通过提供初始容量,计算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方】。如 initialCapacity 为 10,那么得到 sizeCtl 为 16,如果 initialCapacity 为 11,得到 sizeCtl 为 32。 在任何一个构造方法中,都没有对存储Map元素Node的table变量进行初始化。而是在第一次put操作的时候在进行初始化。
put
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 public V put (K key, V value) { return putVal(key, value, false ); } final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
只有在执行第一次put方法时才会调用initTable()初始化Node数组,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0 ) { if ((sc = sizeCtl) < 0 ) Thread.yield(); else if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if ((tab = table) == null || tab.length == 0 ) { int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } break ; } } return tab; }
扩容
我们可以看到,在同一个节点的个数超过8个的时候,会调用treeifyBin方法来看看是扩容还是转化为一棵树,同时在每次添加完元素的addCount方法中,也会判断当前数组中的元素是否达到了sizeCtl的量,如果达到了的话,则会进入transfer方法去扩容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private final void treeifyBin (Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null ) { System.out.println("treeifyBin方\t==>数组长:" +tab.length); if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1 ); else if ((b = tabAt(tab, index)) != null && b.hash >= 0 ) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null , tl = null ; for (Node<K,V> e = b; e != null ; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null , null ); if ((p.prev = tl) == null ) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
用链表头部的TreeNode来构造一个TreeBin,在TreeBin容器中,将链表转化为红黑树。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 TreeBin(TreeNode<K,V> b) { super (TREEBIN, null , null , null ); this .first = b; TreeNode<K,V> r = null ; for (TreeNode<K,V> x = b, next; x != null ; x = next) { next = (TreeNode<K,V>)x.next; x.left = x.right = null ; if (r == null ) { x.parent = null ; x.red = false ; r = x; } else { K k = x.key; int h = x.hash; Class<?> kc = null ; for (TreeNode<K,V> p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1 ; else if (ph < h) dir = 1 ; else if ((kc == null && (kc = comparableClassFor(k)) == null ) || (dir = compareComparables(kc, k, pk)) == 0 ) dir = tieBreakOrder(k, pk); TreeNode<K,V> xp = p; if ((p = (dir <= 0 ) ? p.left : p.right) == null ) { x.parent = xp; if (dir <= 0 ) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break ; } } } } this .root = r; assert checkInvariants (root) ; }
可以看到当需要扩容的时候,调用的时候tryPresize方法,看看trePresize的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 private final void tryPresize (int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1 )) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1 ) + 1 ); int sc; while ((sc = sizeCtl) >= 0 ) { Node<K,V>[] tab = table; int n; if (tab == null || (n = tab.length) == 0 ) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) { break ; } else if (tab == table) { int rs = resizeStamp(n); if (sc < 0 ) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) { transfer(tab, null ); } } } }
在tryPresize方法中,并没有加锁,允许多个线程进入,如果数组正在扩张,则当前线程也去帮助扩容。数组扩容的主要方法就是transfer方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 private final void transfer (Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1 ) ? (n >>> 3 ) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; if (nextTab == null ) { try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1 ]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return ; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true ; boolean finishing = false ; for (int i = 0 , bound = 0 ;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) { advance = false ; } else if ((nextIndex = transferIndex) <= 0 ) { i = -1 ; advance = false ; } else if (U.compareAndSwapInt (this , TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0 ))) { bound = nextBound; i = nextIndex - 1 ; advance = false ; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null ; table = nextTab; sizeCtl = (n << 1 ) - (n >>> 1 ); return ; } if (U.compareAndSwapInt(this , SIZECTL, sc = sizeCtl, sc - 1 )) { if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT) { return ; } finishing = advance = true ; i = n; } } else if ((f = tabAt(tab, i)) == null ) advance = casTabAt(tab, i, null , fwd); else if ((fh = f.hash) == MOVED) advance = true ; else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0 ) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null ; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0 ) { ln = lastRun; hn = null ; } else { hn = lastRun; ln = null ; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null , loTail = null ; TreeNode<K,V> hi = null , hiTail = null ; int lc = 0 , hc = 0 ; for (Node<K,V> e = t.first; e != null ; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null , null ); if ((h & n) == 0 ) { if ((p.prev = loTail) == null ) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null ) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0 ) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0 ) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } } } } } }
下面的两点一定要注意:
·复制之后的新链表不是旧链表的绝对倒序。
·在扩容的时候每个线程都有处理的步长,最少为16,在这个步长范围内的数组节点只有自己一个线程来处理
size实现
1.8中使用一个volatile类型的变量baseCount记录元素的个数,当插入新数据或则删除数据时,会通过addCount()方法更新baseCount,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); }
1、初始化时counterCells为空,在并发量很高时,如果存在两个线程同时执行CAS修改baseCount值,则失败的线程会继续执行方法体中的逻辑,使用CounterCell记录元素个数的变化;
2、如果CounterCell数组counterCells为空,调用fullAddCount()方法进行初始化,并插入对应的记录数,通过CAS设置cellsBusy字段,只有设置成功的线程才能初始化CounterCell数组,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { boolean init = false ; try { if (counterCells == as) { CounterCell[] rs = new CounterCell[2 ]; rs[h & 1 ] = new CounterCell(x); counterCells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; }
3、如果通过CAS设置cellsBusy字段失败的话,则继续尝试通过CAS修改baseCount字段,如果修改baseCount字段成功的话,就退出循环,否则继续循环插入CounterCell对象;
1 2 else if (U.compareAndSwapLong(this , BASECOUNT, v = baseCount, v + x)) break ;
所以在1.8中的size实现比1.7简单多,因为元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public int size () { long n = sumCount(); return ((n < 0L ) ? 0 : (n > (long )Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int )n); } final long sumCount () { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
通过累加baseCount和CounterCell数组中的数量,即可得到元素的总个数;
get
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
同步机制
ConcurrentHashMap是如果来做到并发安全,又是如何做到高效的并发的呢?
首先是读操作,从源码中可以看出来,在get操作中,根本没有使用同步机制,也没有使用unsafe方法,所以读操作是支持并发操作的。
1、那么写操作呢?
分析这个之前,先看看什么情况下会引起数组的扩容,扩容是通过transfer方法来进行的。而调用transfer方法的只有trePresize、helpTransfer和addCount三个方法。
这三个方法又是分别在什么情况下进行调用的呢?
tryPresize是在treeIfybin和putAll方法中调用,treeIfybin主要是在put添加元素完之后,判断该数组节点相关元素是不是已经超过8个的时候,如果超过则会调用这个方法来扩容数组或者把链表转为树。
helpTransfer是在当一个线程要对table中元素进行操作的时候,如果检测到节点的HASH值为MOVED的时候,就会调用helpTransfer方法,在helpTransfer中再调用transfer方法来帮助完成数组的扩容
addCount是在当对数组进行操作,使得数组中存储的元素个数发生了变化的时候会调用的方法。
所以引起数组扩容的情况如下:
那么在扩容的时候,可以不可以对数组进行读写操作呢?
事实上是可以的。当在进行数组扩容的时候,如果当前节点还没有被处理(也就是说还没有设置为fwd节点),那就可以进行设置操作。 如果该节点已经被处理了,则当前线程也会加入到扩容的操作中去。
那么,多个线程又是如何同步处理的呢?
在ConcurrentHashMap中,同步处理主要是通过Synchronized和unsafe两种方式来完成的。
在取得sizeCtl、某个位置的Node的时候,使用的都是unsafe的方法,来达到并发安全的目的
当需要在某个位置设置节点的时候,则会通过Synchronized的同步机制来锁定该位置的节点。
在数组扩容的时候,则通过处理的步长和fwd节点来达到并发安全的目的,通过设置hash值为MOVED
当把某个位置的节点复制到扩张后的table的时候,也通过Synchronized的同步机制来保证现程安全
思考
在put中获取链表的头节点时,为什么不直接用tab[i]?
1 2 3 4 5 6 HashEntry<K,V>[] tab = table; int index = (tab.length - 1 ) & hash;HashEntry<K,V> first = entryAt(tab, index);
将volatile的table字段引用赋值给tab局部变量,为了保证每次读取的table中的数组项都是最新的值,因而调用entryAt()方法获取数组项的值而不是通过tab[index]方式直接获取(在put操作更新节点链时,它采用Unsafe.putOrderedObject()操作,此时它对链头的更新只局限与当前线程,为了保证接下来的put操作能够读取到上一次的更新结果,需要使用volatile的语法去读取节点链的链头)。 entryAt方法:
1 2 3 4 5 static final <K,V> HashEntry<K,V> entryAt (HashEntry<K,V>[] tab, int i) { return (tab == null ) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long )i << TSHIFT) + TBASE); }
setEntryAt方法:
1 2 3 4 static final <K,V> void setEntryAt (HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) { UNSAFE.putOrderedObject(tab, ((long )i << TSHIFT) + TBASE, e); }
1.8synchronized用在哪些地方
1、put的时候,如果在table[i]上有元素的话,就采用synchronized的方式加锁 2、在transfer扩容的时候,使用synchronized重复对table[i]上的第一个元素加锁 3、链表转树的过程也是一样
参考资料