ConcurrentHashMap原理剖析51CTO博客 - AG环亚娱乐

ConcurrentHashMap原理剖析51CTO博客

2019-01-03 10:42:20 | 作者: 景行 | 标签: 操作,一个,这个 | 浏览: 2651

HashTable是一个线程安全的类,它运用synchronized来锁住整张Hash表来完成线程安全,即每次锁住整张表让线程独占。ConcurrentHashMap答应多个修正操作并发进行,其关键在于运用了锁别离技能。它运用了多个锁来操控对hash表的不同部分进行的修正。ConcurrentHashMap内部运用段(Segment)来标明这些不同的部分,每个段其实就是一个小的Hashtable,它们有自己的锁。只需多个修正操作发生在不同的段上,它们就能够并发进行。

有些方法需求跨段,比方size()和containsValue(),它们或许需求断定整个表而而不只仅是某个段,这需求按次序断定一切段,操作完毕后,又按次序开释一切段的锁。这儿“按次序”是很重要的,不然极有或许呈现死锁,在ConcurrentHashMap内部,段数组是final的,而且其成员变量实际上也是final的,可是,仅仅是将数组声明为final的并不确保数组成员也是final的,这需求完成上的确保。这能够确保不会呈现死锁,由于取得锁的次序是固定的。

1. 完成原理

ConcurrentHashMap运用分段锁技能,将数据分红一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁拜访其间一个段数据的时分,其他段的数据也能被其他线程拜访,能够完成真实的并发拜访。如下图是ConcurrentHashMap的内部结构图:

 

从图中能够看到,ConcurrentHashMap内部分为很多个Segment,每一个Segment具有一把锁,然后每个Segment(承继ReentrantLock)

static final class Segment<K,V> extends ReentrantLock implements Serializable

 

Segment承继了ReentrantLock,标明每个segment都能够作为一个锁。(ReentrantLock前文现已说到,不了解的话就把作为synchronized的替代者吧)这样对每个segment中的数据需求同步操作的话都是运用每个segment容器目标本身的锁来完成。只要对大局需求改动时断定的是一切的segment。

Segment下面包括很多个HashEntry列表数组。关于一个key,需求经过三次(为什么要hash三次下文会具体解说)hash操作,才干终究定位这个元素的方位,这三次hash别离为:

  1. 关于一个key,先进行一次hash操作,得到hash值h1,也即h1 = hash1(key);

  2. 将得到的h1的高几位进行第2次hash,得到hash值h2,也即h2 = hash2(h1高几位),经过h2能够断定该元素的放在哪个Segment;

  3. 将得到的h1进行第三次hash,得到hash值h3,也即h3 = hash3(h1),经过h3能够断定该元素放置在哪个HashEntry。

ConcurrentHashMap中首要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点),对应上面的图能够看出之间的联系

 

/** * The segments, each of which is a specialized hash table 
*/  final Segment<K,V>[] segments;

 

不变(Immutable)和易变(Volatile)ConcurrentHashMap彻底答应多个读操作并发进行,读操作并不需求加锁。假设运用传统的技能,如HashMap中的完成,假设答应能够在hash链的中心增加或删去元素,读操作不加锁将得到不一致的数据。ConcurrentHashMap完成技能是确保HashEntry几乎是不行变的。HashEntry代表每个hash链中的一个节点,其结构如下所示:

1 static final class HashEntry<K,V> {  
2      final K key;  
3      final int hash;  
4      volatile V value;  
5      volatile HashEntry<K,V> next;  
6  }

 

在JDK 1.6中,HashEntry中的next指针也界说为final,而且每次刺进将新增加节点作为链的头节点(同HashMap完成),而且每次删去一个节点时,会将删去节点之前的一切节点 复制一份组成一个新的链,而将当时节点的上一个节点的next指向当时节点的下一个节点,然后在删去今后 有两条链存在,因此能够确保即便在同一条链中,有一个线程在删去,而另一个线程在遍历,它们都能作业杰出,由于遍历的线程能持续运用原有的链。因此这种完成是一种愈加细粒度的happens-before联系,即假设遍历线程在删去线程完毕后开端,则它能看到删去后的改动,假设它发生在删去线程正在履行中心,则它会运用原有的链,而不会比及删去线程完毕后再履行,即看不到删去线程的影响。假设这不契合你的需求,仍是乖乖的用Hashtable或HashMap的synchronized版别,Collections.synchronizedMap()做的包装。

而HashMap中的Entry只要key是final的

1 static class Entry<K,V> implements Map.Entry<K,V> {2         final K key;3         V value;4         Entry<K,V> next;5         int hash;

 

不变形式(immutable)是多线程安全里最简略的一种确保方法。由于你拿他没有方法,想改动它也没有时机。
不变形式首要经过final关键字来限制的。在JMM中final关键字还有特别的语义。Final域使得确保初始化安全性(initialization safety)成为或许,初始化安全性让不行变形目标不需求同步就能自在地被拜访和同享。

1.1 初始化

先看看ConcurrentHashMap的初始化做了哪些工作,结构函数的源码如下:

 1 public ConcurrentHashMap(int initialCapacity, 2                              float loadFactor, int concurrencyLevel) { 3         if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 4             throw new IllegalArgumentException(); 5         if (concurrencyLevel > MAX_SEGMENTS) 6             concurrencyLevel = MAX_SEGMENTS; 7         // Find power-of-two sizes best matching arguments 8         int sshift = 0; 9         int ssize = 1;10         while (ssize < concurrencyLevel) {11             ++sshift;12             ssize <<= 1;13         }14         this.segmentShift = 32 - sshift;15         this.segmentMask = ssize - 1;16         if (initialCapacity > MAXIMUM_CAPACITY)17             initialCapacity = MAXIMUM_CAPACITY;18         int c = initialCapacity / ssize;19         if (c * ssize < initialCapacity)20             ++c;21         int cap = MIN_SEGMENT_TABLE_CAPACITY;22         while (cap < c)23             cap <<= 1;24         // create segments and segments[0]25         Segment<K,V> s0 =26             new Segment<K,V>(loadFactor, (int)(cap * loadFactor),27                              (HashEntry<K,V>[])new HashEntry[cap]);28         Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];29         UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]30         this.segments = ss;31     }

 

传入的参数有initialCapacity,loadFactor,concurrencyLevel这三个。

  • initialCapacity标明新创立的这个ConcurrentHashMap的初始容量,也就是上面的结构图中的Entry数量。默认值为static final int DEFAULT_INITIAL_CAPACITY = 16;

  • loadFactor标明负载因子,就是当ConcurrentHashMap中的元素个数大于loadFactor * 最大容量时就需求rehash,扩容。默认值为static final float DEFAULT_LOAD_FACTOR = 0.75f;

  • concurrencyLevel标明并发等级,这个值用来断定Segment的个数,Segment的个数是大于等于concurrencyLevel的第一个2的n次方的数。比方,假设concurrencyLevel为12,13,14,15,16这些数,则Segment的数目为16(2的4次方)。默认值为static final int DEFAULT_CONCURRENCY_LEVEL = 16;。抱负情况下ConcurrentHashMap的真实的并发拜访量能够到达concurrencyLevel,由于有concurrencyLevel个Segment,假设有concurrencyLevel个线程需求拜访Map,而且需求拜访的数据都刚好别离落在不同的Segment中,则这些线程能够无竞赛地自在拜访(由于他们不需求竞赛同一把锁),到达一起拜访的作用。这也是为什么这个参数起名为“并发等级”的原因。

初始化的一些动作:

  1. 验证参数的合法性,假设不合法,直接抛出反常。

  2. concurrencyLevel也就是Segment的个数不能超越规则的最大Segment的个数,默认值为static final int MAX_SEGMENTS = 1 << 16;,假设超越这个值,设置为这个值。

  3. 然后运用循环找到大于等于concurrencyLevel的第一个2的n次方的数ssize,这个数就是Segment数组的巨细,并记载总共向左按位移动的次数sshift,并令segmentShift = 32 - sshift,而且segmentMask的值等于ssize - 1,segmentMask的各个二进制位都为1,意图是之后能够经过key的hash值与这个值做&运算断定Segment的索引。

  4. 查看给的容量值是否大于答应的最大容量值,假设大于该值,设置为该值。最大容量值为static final int MAXIMUM_CAPACITY = 1 << 30;。

  5. 然后核算每个Segment均匀应该放置多少个元素,这个值c是向上取整的值。比方初始容量为15,Segment个数为4,则每个Segment均匀需求放置4个元素。

  6. 最终创立一个Segment实例,将其作为Segment数组的第一个元素。

1.2 put操作

put操作的源码如下:

 1 public V put(K key, V value) { 2       Segment<K,V> s; 3       if (value == null) 4           throw new NullPointerException(); 5       int hash = hash(key); 6       int j = (hash >>> segmentShift) & segmentMask; 7       if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck 8            (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment 9           s = ensureSegment(j);10       return s.put(key, hash, value, false);11   }

 

操作过程如下:

    1. 判别value是否为null,假设为null,直接抛出反常。

    2. key经过一次hash运算得到一个hash值。(这个hash运算下文详说)

    3. 将得到hash值向右按位移动segmentShift位,然后再与segmentMask做&运算得到segment的索引j。
      在初始化的时分咱们说过segmentShift的值等于32-sshift,例如concurrencyLevel等于16,则sshift等于4,则segmentShift为28。hash值是一个32位的整数,将其向右移动28位就变成这个姿态:
      0000 0000 0000 0000 0000 0000 0000 xxxx,然后再用这个值与segmentMask做&运算,也就是取最终四位的值。这个值断定Segment的索引。

    4. 运用Unsafe的方法从Segment数组中获取该索引对应的Segment目标。

    5. 向这个Segment目标中put值,这个put操作也基本是相同的过程(经过&运算获取HashEntry的索引,然后set)。

 1 final V put(K key, int hash, V value, boolean onlyIfAbsent) { 2             HashEntry<K,V> node = tryLock() ? null : 3                 scanAndLockForPut(key, hash, value); 4             V oldValue; 5             try { 6                 HashEntry<K,V>[] tab = table; 7                 int index = (tab.length - 1) & hash; 8                 HashEntry<K,V> first = entryAt(tab, index); 9                 for (HashEntry<K,V> e = first;;) {10                     if (e != null) {11                         K k;12                         if ((k = e.key) == key ||13                             (e.hash == hash && key.equals(k))) {14                             oldValue = e.value;15                             if (!onlyIfAbsent) {16                                 e.value = value;17                                 ++modCount;18                             }19                             break;20                         }21                         e = e.next;22                     }23                     else {24                         if (node != null)25                             node.setNext(first);26                         else27                             node = new HashEntry<K,V>(hash, key, value, first);28                         int c = count + 1;29                         if (c > threshold && tab.length < MAXIMUM_CAPACITY)30                             rehash(node);31                         else32                             setEntryAt(tab, index, node);33                         ++modCount;34                         count = c;35                         oldValue = null;36                         break;37                     }38                 }39             } finally {40                 unlock();41             }42             return oldValue;43         }

 

put操作是要加锁的。

1.3 get操作

get操作的源码如下:

 1 public V get(Object key) { 2         Segment<K,V> s; // manually integrate access methods to reduce overhead 3         HashEntry<K,V>[] tab; 4         int h = hash(key); 5         long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; 6         if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && 7             (tab = s.table) != null) { 8             for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile 9                      (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);10                  e != null; e = e.next) {11                 K k;12                 if ((k = e.key) == key || (e.hash == h && key.equals(k)))13                     return e.value;14             }15         }16         return null;17     }

 

操作过程为:

  1. 和put操作相同,先经过key进行两次hash断定应该去哪个Segment中取数据。

  2. 运用Unsafe获取对应的Segment,然后再进行一次&运算得到HashEntry链表的方位,然后从链表头开端遍历整个链表(由于Hash或许会有磕碰,所以用一个链表保存),假设找到对应的key,则回来对应的value值,假设链表遍历完都没有找到对应的key,则阐明Map中不包括该key,回来null。

值得注意的是,get操作是不需求加锁的(假设value为null,会调用readValueUnderLock,只要这个过程会加锁),经过前面说到的volatile和final来确保数据安全。

1.4 size操作

size操作与put和get操作最大的差异在于,size操作需求遍历一切的Segment才干算出整个Map的巨细,而put和get都只关怀一个Segment。假定咱们当时遍历的Segment为SA,那么在遍历SA过程中其他的Segment比方SB或许会被修正,所以这一次运算出来的size值或许并不是Map当时的真实巨细。所以一个比较简略的方法就是核算Map巨细的时分一切的Segment都Lock住,不能更新(包括put,remove等等)数据,核算完之后再Unlock。这是普通人能够想到的计划,可是牛逼的作者还有一个更好的Idea:先给3次时机,不lock一切的Segment,遍历一切Segment,累加各个Segment的巨细得到整个Map的巨细,假设某相邻的两次核算获取的一切Segment的更新的次数(每个Segment都有一个modCount变量,这个变量在Segment中的Entry被修正时会加一,经过这个值能够得到每个Segment的更新操作的次数)是相同的,阐明核算过程中没有更新操作,则直接回来这个值。假设这三次不加锁的核算过程中Map的更新次数有改动,则之后的核算先对一切的Segment加锁,再遍历一切Segment核算Map巨细,最终再解锁一切Segment。源代码如下:

 1 public int size() { 2         // Try a few times to get accurate count. On failure due to 3         // continuous async changes in table, resort to locking. 4         final Segment<K,V>[] segments = this.segments; 5         int size; 6         boolean overflow; // true if size overflows 32 bits 7         long sum;         // sum of modCounts 8         long last = 0L;   // previous sum 9         int retries = -1; // first iteration isnt retry10         try {11             for (;;) {12                 if (retries++ == RETRIES_BEFORE_LOCK) {13                     for (int j = 0; j < segments.length; ++j)14                         ensureSegment(j).lock(); // force creation15                 }16                 sum = 0L;17                 size = 0;18                 overflow = false;19                 for (int j = 0; j < segments.length; ++j) {20                     Segment<K,V> seg = segmentAt(segments, j);21                     if (seg != null) {22                         sum += seg.modCount;23                         int c = seg.count;24                         if (c < 0 || (size += c) < 0)25                             overflow = true;26                     }27                 }28                 if (sum == last)29                     break;30                 last = sum;31             }32         } finally {33             if (retries > RETRIES_BEFORE_LOCK) {34                 for (int j = 0; j < segments.length; ++j)35                     segmentAt(segments, j).unlock();36             }37         }38         return overflow ? Integer.MAX_VALUE : size;39     }

 

举个比方:

1.5 containsValue操作

containsValue操作采用了和size操作相同的主意:

 1 public boolean containsValue(Object value) { 2         // Same idea as size() 3         if (value == null) 4             throw new NullPointerException(); 5         final Segment<K,V>[] segments = this.segments; 6         boolean found = false; 7         long last = 0; 8         int retries = -1; 9         try {10             outer: for (;;) {11                 if (retries++ == RETRIES_BEFORE_LOCK) {12                     for (int j = 0; j < segments.length; ++j)13                         ensureSegment(j).lock(); // force creation14                 }15                 long hashSum = 0L;16                 int sum = 0;17                 for (int j = 0; j < segments.length; ++j) {18                     HashEntry<K,V>[] tab;19                     Segment<K,V> seg = segmentAt(segments, j);20                     if (seg != null && (tab = seg.table) != null) {21                         for (int i = 0 ; i < tab.length; i++) {22                             HashEntry<K,V> e;23                             for (e = entryAt(tab, i); e != null; e = e.next) {24                                 V v = e.value;25                                 if (v != null && value.equals(v)) {26                                     found = true;27                                     break outer;28                                 }29                             }30                         }31                         sum += seg.modCount;32                     }33                 }34                 if (retries > 0 && sum == last)35                     break;36                 last = sum;37             }38         } finally {39             if (retries > RETRIES_BEFORE_LOCK) {40                 for (int j = 0; j < segments.length; ++j)41                     segmentAt(segments, j).unlock();42             }43         }44         return found;45     }

2. 关于hash

看看hash的源代码:

 1 private int hash(Object k) { 2         int h = hashSeed; 3  4         if ((0 != h) && (k instanceof String)) { 5             return sun.misc.Hashing.stringHash32((String) k); 6         } 7  8         h ^= k.hashCode(); 9 10         // Spread bits to regularize both segment and index locations,11         // using variant of single-word Wang/Jenkins hash.12         h += (h <<  15) ^ 0xffffcd7d;13         h ^= (h >>> 10);14         h += (h <<   3);15         h ^= (h >>>  6);16         h += (h <<   2) + (h << 14);17         return h ^ (h >>> 16);18     }

 

源码中的注释是这样的:

这儿用到了Wang/Jenkins hash算法的变种,首要的意图是为了削减哈希抵触,使元素能够均匀的散布在不同的Segment上,然后进步容器的存取功率。假设哈希的质量差到极点,那么一切的元素都在一个Segment中,不只存取元素缓慢,分段锁也会失掉含义。

举个简略的比方:

1 System.out.println(Integer.parseInt("0001111", 2) & 15);2 System.out.println(Integer.parseInt("0011111", 2) & 15);3 System.out.println(Integer.parseInt("0111111", 2) & 15);4 System.out.println(Integer.parseInt("1111111", 2) & 15);

这些数字得到的hash值都是相同的,满是15,所以假设不进行第一次预hash,发生抵触的几率仍是很大的,可是假设咱们先把上例中的二进制数字运用hash()函数先进行一次预hash,得到的成果是这样的:

 

上面这个比方引证自:  InfoQ

能够看到每一位的数据都散开了,而且ConcurrentHashMap中是运用预hash值的高位参加运算的。比方之前说的先将hash值向右按位移动28位,再与15做&运算,得到的成果都别为:4,15,7,8,没有抵触!

3. 注意事项
  • ConcurrentHashMap中的key和value值都不能为null,HashMap中key能够为null,HashTable中key不能为null。

  • ConcurrentHashMap是线程安全的类并不能确保运用了ConcurrentHashMap的操作都是线程安全的!

  • ConcurrentHashMap的get操作不需求加锁,put操作需求加锁


版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表AG环亚娱乐立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章