HashMap 作為我們平日里 java 開發(fā)使用最多的集合,本篇文章要介紹的 ConcurrentHashMap 是前者的升級,也許有些小伙伴有聽說過。本篇文章將帶大家簡單地了解一下 java 的核心基礎(chǔ) ConcurrentHashMap 的知識內(nèi)容。
1 前言
ConcurrentHashMap是基于Hash表的Map接口實現(xiàn),鍵與值均不允許為NULL,他是一個線程安全的Map。同時他也是一個無序的Map,不同時間進行遍歷可能會得到不同的順序。在JDK1.8之前,ConcurrentHashMap使用分段鎖以在保證線程安全的同時獲得更大的效率。JDK1.8開始舍棄了分段鎖,使用自旋+CAS+sync關(guān)鍵字來實現(xiàn)同步。本文所述便是基于JDK1.8。
ConcurrentHashMap與HashMap有共同之處,一些HashMap的基本概念與實現(xiàn),本文不再贅述。
2 繼承關(guān)系
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable
可以看到ConcurrentHashMap繼承了AbstractMap及ConcurrentMap抽象類,并實現(xiàn)了Serializable接口,這說明ConcurrentHashMap是一個線程安全的標(biāo)準(zhǔn)Map,且允許序列化。與HashMap不同是ConcurrentHashMap不允許Clone。
3 構(gòu)造方法
ConcurrentHashMap同樣是采用懶初始化的方式,有實際元素時才進行容器的初始化。因此其構(gòu)造方法與HashMap相差無幾。
// 無參構(gòu)造
public ConcurrentHashMap() {
}
// 帶有初始容量的構(gòu)造
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); //求最小二次冪
this.sizeCtl = cap; // sizeCtl 暫存容量
}
// 帶有初始集合的構(gòu)造
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
// 指定初始容量和裝載因子的構(gòu)造
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
以上構(gòu)造并沒有什么特別的,邏輯也相對簡單,不再詳細解析,感興趣的話可以到前言提到的HashMap篇了解。
除此之外,ConcurrentHashMap擁有另外一個值得注意的構(gòu)造方法:
指定初始容量,裝載因子以及并發(fā)級別的構(gòu)造方法:
源碼:
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 基礎(chǔ)校驗
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // 取初始容量和并發(fā)級別的最大值
initialCapacity = concurrencyLevel;
long size = (long)(1.0 + (long)initialCapacity / loadFactor); // 指定了裝載因子,因此就用初始容量和裝載因子計算出實際要的初始容量
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size); // 確定計算出初始容量,
this.sizeCtl = cap;
}
解析: 除了初始容量與裝載因子外,此構(gòu)造方法還有一個并發(fā)級別concurrencyLevel
的參數(shù)。在jdk1.7時,并發(fā)級別作為分段鎖的標(biāo)準(zhǔn)進行分段。但是jdk1.8開始舍棄了分段鎖,為了版本兼容,此構(gòu)造方法依然存在,但是concurrencyLevel
也不再具有其分段依據(jù)的意義,而是作為初始容量的定義依據(jù)。
4 初始化
ConcurrentHashMap采用懶初始化的方式,在第一次putAvl
時如果容器為空,則會調(diào)用initTable()
進行容器的初始化:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { // tab為空時就一直循環(huán)
if ((sc = sizeCtl) < 0) // sc小于0說明正在初始化或者正在復(fù)制,放棄CPU等下一次
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 否則的話就用CAS把sizectl設(shè)置成-1,標(biāo)識正在初始化,且成功的話
try {
// CAS之后再判斷一下,這是為了避免并發(fā),比如上面判斷了tab為空,然后另一個線程做了初始化操作,結(jié)束時sc被設(shè)置為擴容閾值,然后繼續(xù)(sc = sizeCtl) < 0,這時cs還是大于0,所以還是能走進來的,所以這里再判斷一下
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // sc最初始是存在的初始容量的
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 建立一個容器
table = tab = nt; // 然后復(fù)制到table屬性上
sc = n - (n >>> 2); // sc等于 n>>>2就是n/4就是0.25n sc = 0.75n,相當(dāng)于設(shè)置擴容閾值
}
} finally {
sizeCtl = sc; // sizeCtl賦值成擴容閾值
}
break; // 只要走到這里面,那么說明一定已經(jīng)初始化結(jié)束了,無論初始化是自己做的還是其他線程做的,然后就跳出
}
}
return tab; // 返回新的tab
}
解析: 容器初始化完成之前不斷的進行循環(huán)。這是因為ConcurrentHashMap是一個支持并發(fā)的Map,可能同時會有多個線程進入initTable
方法,但是只有一個線程執(zhí)行初始化操作,那么剩下的線程就需要等待初始化完成再跳出initTable
方法,以滿足接下來的putVal
操作。
為了保證只有一個線程執(zhí)行初始化操作,使用sizeCtl
來作為標(biāo)識,sizeCtl
為-1時即說明當(dāng)前已有線程正在初始化,則放棄CPU繼續(xù)循環(huán)。sizeCtl
不為負數(shù)時,則使用CAS(通過Unsafe+偏移量的方式實現(xiàn))將sizeCtl
置為-1,如果當(dāng)前線程成功的話則進入實際的擴容操作。
通過CAS鎖定成功后再次判斷容器是否為空,這是為了避免并發(fā),比如上面判斷了tab為空,然后另一個線程做了初始化操作,結(jié)束時sc被設(shè)置為擴容閾值,然后繼續(xù)(sc = sizeCtl) < 0,
這時cs還是大于0,所以還是能走進來的,所以這里再判斷一下。
如果sc(sizeCtl原值)大于0,則以sc做為初始容量。這個在構(gòu)造方法篇提到過,對于有初始容量要求的構(gòu)造,會以sizeCtl
暫存初始容量。否則的話就取默認的初始容量DEFAULT_CAPACITY(16)。
接下來就建立目標(biāo)容量的容器并賦值給容器屬性table
,計算該容量下的擴容閾值賦值給sizeCtl
。sizeCtl
的賦值放到finally里面的原因是因為無論容器創(chuàng)建成功還是失敗,都需要放開以sizeCtl
為負值作為判斷條件的鎖,以保證在本線程創(chuàng)建失敗的情況其它線程能繼續(xù)競爭鎖繼續(xù)進行容器創(chuàng)建的工作。
至此,容器的初始化便完成了。
5 新數(shù)據(jù)載入-putVal
ConcurrentHashMap新數(shù)據(jù)載入主要通過putVal
實現(xiàn):
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) { // 會一直循環(huán),直到break
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 初始化table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // unsafe獲取i處的數(shù)據(jù),如果為null
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null))) // cas操作,如果i處為null就構(gòu)造新元素,成功就break 失敗就繼續(xù)循環(huán)
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // f處的元素hash是-1 -1意味著這是一個遷移節(jié)點
tab = helpTransfer(tab, f); // 輔助遷移開始
else {
V oldVal = null;
synchronized (f) { // 用f進行同步
if (tabAt(tab, i) == f) { // 這里要保證i處還是f,而不是已經(jīng)被其它元素并發(fā)占據(jù)了
if (fh >= 0) { // f處的hash要大于等于0,這是為什么呢?除了MOVED還會有其它負值嗎
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { // 從e往后找,并且記錄元素數(shù)量
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 一系列判斷Key是否相等,如果相等
oldVal = e.val; // 記錄舊值
if (!onlyIfAbsent) // 不是僅允許新增的話
e.val = value; // 記錄新值
break; // 成功的話就跳出大循環(huán)
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // e向后移動, 如果移動之后是nul,就是在末尾的話
pred.next = new Node<K,V>(hash, key,
value, null); // 建立一個新節(jié)點
break; // 成功的話就跳出大循環(huán)
}
}
}
else if (f instanceof TreeBin) { // 如果是一個樹化節(jié)點
Node<K,V> p;
binCount = 2; // 樹化的節(jié)點的話固定元素數(shù)量為2,這是一個相對特殊的值,即會擴容又不會重復(fù)樹化
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) { // 往樹里面新增元素,如果存在就返回那個節(jié)點,不存在就是新增節(jié)點
oldVal = p.val; // 記錄舊值
if (!onlyIfAbsent) // 不是僅允許新增的話
p.val = value; // 記錄新值
}
}
}
}
if (binCount != 0) { // 如果元素數(shù)不為0的話
if (binCount >= TREEIFY_THRESHOLD) // 判斷是否到了樹化閾值
treeifyBin(tab, i); // 到了的話進行樹化
if (oldVal != null) // 覆蓋操作直接返回舊值不會執(zhí)行addCount
return oldVal;
break; // binCount != 0才跳出大循環(huán),為0是tabAt(tab, i) != f導(dǎo)致的,即i處已經(jīng)被其他線程覆蓋了
}
}
}
addCount(1L, binCount); //
return null;
}
解析: 完成為空校驗后,通過spread方法來計算出hash以確定下標(biāo)位置,spread的計算方式為(h ^ (h >>> 16)) & HASH_BITS
,HASH_BITS的值為0x7fffffff
,描述出來就是將hashCode的高16位和低16位做異或操作,并保證最高位符號位為0(結(jié)果是一個正數(shù)),注解表示這樣做的原因是為了使數(shù)據(jù)更加分散,盡可能的避免hash沖突。
如果容器tab為空或者長度為0說明容器未初始化,那么就調(diào)用initTable進行容器初始化。
否則的話對hash與現(xiàn)容量進行與運算得出數(shù)據(jù)應(yīng)處的下標(biāo),判斷此下標(biāo)所在處節(jié)點是否為空,為空說明沒有元素,直接創(chuàng)建一個新節(jié)點作為頭元素放進去,這一步通過CAS操作實現(xiàn),避免并發(fā)情況下另外的線程先一步完成頭節(jié)點創(chuàng)建操作。
如果下標(biāo)所在處節(jié)點不為空,說明該處已經(jīng)有元素了,此時判斷頭節(jié)點的hash是否為MOVED,MOVED的值為-1。上面提到過,正常元素通過spread
方法計算出來的hash值都會使正數(shù)。此處-1為一個特殊值,意味著此節(jié)點正在進行擴容遷移工作,那么此時就調(diào)用8 輔助擴容-helpTransfer方法進行輔助遷移工作。
如果節(jié)點hash不為MOVED,意味著這是一個正常節(jié)點,就執(zhí)行元素載入工作,使用synchronized關(guān)鍵字實現(xiàn)同步, 同步塊內(nèi)開始還要使用tabAt(tab, i) == f
判斷進入同步后i處的節(jié)點還是原節(jié)點,接下來就是元素的載入工作了,整體和HashMap的流程是相差無幾的,感興趣的話可以去文首提到的解析HashMap的文章了解一下。synchronized塊將元素載入節(jié)點后,接著會對binCount
進行判斷,binCount
在節(jié)點還處于鏈表模式下的情況下記錄節(jié)點在新元素載入后的元素總數(shù)量,此處判斷binCount
達到TREEIFY_THRESHOLD(8) 樹化閾值后,就會對該節(jié)點進行樹化操作。樹化操作依然是通過synchronized來完成的,這里不做過多延伸。
以上整個新元素載入操作都是一個自旋的過程,一直到新元素載入成功后通過break跳出自旋過程。
新元素載入節(jié)點后,需要對count實時維護,count通過 6 維護與啟動擴容-addCount 方法完成同步維護工作。
6 維護與啟動擴容-addCount
ConcurrentHashMap通過addCount
方法實時維護內(nèi)部元素數(shù)量,并在達到擴容閾值的情況下啟動擴容操作:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null || // counterCells不為空,如果沒經(jīng)歷過并發(fā)肯定是空的,這個判斷意思是維護count的過程中已經(jīng)經(jīng)歷過并發(fā),已經(jīng)處于不使用baseCount維護的階段了
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { //這個比較唯一會出現(xiàn)false的情況就是b = baseCount執(zhí)行之后,還沒進行cas時,baseCount又被改了。意思是如果沒有并發(fā)就用baseCount維護,并發(fā)導(dǎo)致baseCount維護失敗就進入counterCells維護
CounterCell a; long v; int m;
boolean uncontended = true; // 記錄是否遭遇了并發(fā)
if (as == null || (m = as.length - 1) < 0 || // as等于null意味著baseCount維護遭遇并發(fā)了,且是初次(非初次counterCells不會為null) (m = as.length - 1) < 0說明counterCells長度為0,即沒有實際內(nèi)容
(a = as[ThreadLocalRandom.getProbe() & m]) == null || // counterCells是一個分段計算的概念,這里就是隨機找一個槽,如果這個槽位空的話
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 槽不為空的話就用cas把槽里面的值賦值給v+a.val 如果失敗的話
fullAddCount(x, uncontended); // 用fullAddCount 就是LongAdder的模式將x放進去,初始化counterCells
return;
}
if (check <= 1) // check小于等于1(1表示是當(dāng)前節(jié)點的第一個元素,-1標(biāo)識是replace方法或者clear過來的,這些都不需要擴容)
return;
s = sumCount(); // 計算一下現(xiàn)在的總量
}
if (check >= 0) { //baseCount成功的情況且 check大于0,即不是replace或者clear方法進來的
Node<K,V>[] tab, nt; int n, sc;
/**
1.s >= (long)(sc = sizeCtl) s是總數(shù)量 s大于等于擴容閾值sizeCtl時,或者sizeCtl為負數(shù),正在擴容時
2.(tab = table) != null 并且容器tab不為空,意思就是表已經(jīng)初始化完畢了,是執(zhí)行擴容而不是執(zhí)行初始化
3. (n = tab.length) < MAXIMUM_CAPACITY) 并且容量還沒有達到最大容量時
*/
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n); // 獲得容量特征值:容量n的左0數(shù)量并將高位置1
if (sc < 0) { // 如果sc小于0 意味這個正在進行并發(fā)擴容
/**
sc >>> RESIZE_STAMP_SHIFT != rs是判斷現(xiàn)在的容量與sc中記錄的長度是不是不一樣長,不一樣長的話說明不是同一長度的擴容,就要break-保證是同等長度的擴容
sc == rs + 1 這個地方在看來是一個bug,如果已經(jīng)處于擴容中,sc目前處于高16位是容量特征值,低16位標(biāo)識擴容線程量特征值的狀態(tài),而此時的rs是容量特征值的狀態(tài),也就是目前sc的高16位,這里原意應(yīng)該是判斷目前是不是只剩下一個非初始擴容線程
,為什么這么說呢,因為初始擴容線程會把sc設(shè)置稱高16位是容量特征值即rs,低16位標(biāo)識擴容線程量特征值(初始擴容線程+2,非初始擴容線程+1),這一步判斷的就是如過只剩下一個非初始擴容線程了,說明初始擴容線程已經(jīng)結(jié)束,現(xiàn)在已經(jīng)進入擴容的最后階段了,就不需要再繼續(xù)進行輔助擴容了
,正確的方式應(yīng)該是sc == (rs << RESIZE_STAMP_SHIFT) + 1
sc == rs + MAX_RESIZERS 和上面+1一樣,不過這個是判斷參與resize的線程是否已經(jīng)達到所允許的最大值 ,看起來也是一個bug,正確的應(yīng)是sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
(nt = nextTable) == null nextTable為空說明已經(jīng)擴容完畢了,不再需要輔助擴容了
transferIndex <= 0 // 判斷下邊界是不是已經(jīng)到0了,擴容是根據(jù)CPU算出步長,一個擴容線程擴容一段,從高到低分配,transferIndex記錄的就行目前分配到哪個下標(biāo)了,如果這個下標(biāo)小于等于0說明整個容器已經(jīng)分配完了,就不需要在輔助擴容了
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 如果是輔助擴容的sc已經(jīng)處于高16位是容量特征值,低16位標(biāo)識擴容線程量特征值的狀態(tài)了+1讓地位的擴容線程量特征值再+1
transfer(tab, nt); // 進行擴容
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2)) // 初始擴容現(xiàn)在是+2 執(zhí)行完這一步sc的值高16位是容量特征值,低16位標(biāo)識擴容線程量特征值,因為resizeStamp時將高位置為1是的其又是一個負數(shù),表示正在擴容
transfer(tab, null);
s = sumCount(); // 計算總數(shù)
} // 一直循環(huán)擴容
}
}
解析: count的維護有兩種方式:未產(chǎn)生并發(fā)場景時通過baseCount維護,經(jīng)歷過并發(fā)場景后轉(zhuǎn)變?yōu)橥ㄟ^counterCells維護。baseCount模式為直接使用int進行加減運算,cas保證同步,counterCells模式類似于1.8之前的ConcurrentHashMap,采用一個分段的概念,運算時隨機找一個槽,通過cas保證一個槽的值加算同步,count即為所以槽的加和(使用LongAdder實現(xiàn))。
addCount方法初始即通過counterCells是否為空(為空說明未經(jīng)歷初始化,使用的baseCount模式)來判斷當(dāng)前是否為counterCells模式,如果處于counterCells模式則進入counterCells模式計算邏輯,否則的話使用baseCount模式來維護count并記錄為s,如果遭遇并發(fā)導(dǎo)致維護失敗,則轉(zhuǎn)換為counterCells模式進入counterCells模式計算邏輯。
進入counterCells模式,首先看counterCells是否已完成初始化,若為初始化進入fullAddCount
,已初始化的話則隨機找到counterCells中的一個槽位,如果這個槽位未初始化則進入fullAddCount,如果已初始化則對這個操作進行CAS的加操作來維護count,CAS成功則維護完成,并發(fā)導(dǎo)致CAS維護失敗則進入fullAddCount
。fullAddCount
有點類似于CMS的full GC退化機制,會完成counterCells的初始化以及并發(fā)沖突場景下同步完成count維護。counterCells最后則判斷check是否小于等于1(1表示是當(dāng)前節(jié)點的第一個元素,-1標(biāo)識是replace方法或者clear過來的,這些都不需要擴容)則直接結(jié)束,不進行擴容判定,否則的話通過sumCount
獲得當(dāng)前count并記錄為s以準(zhǔn)備后續(xù)的擴容判定。
check大于等于0(即不是replace或者clear方法引起的count改變)時進行擴容判定,執(zhí)行擴容需要滿足三個條件:1.count大于擴容閾值或者當(dāng)前正在執(zhí)行擴容操作2.容器已初始化完畢3.容量未達到最大容量。滿足以上條件則需判定當(dāng)前是否正在擴容,因為擴容時會將SIZECTL設(shè)置為一個特征值,這個特征值為負值,因此通過sc是否小于0來判定當(dāng)前是否處于擴容狀態(tài)。接下來使用resizeStamp
獲得容量特征值(一個表示容量n的左0數(shù)量并將高位置1的值),如果當(dāng)前未處于擴容過程,則通過容量特征值獲得擴容特征值(高16位為容量特征值,低16位為擴容線程量特征值)并通過CAS修改SIZECTL為擴容特征值,成功則當(dāng)前線程作為第一個擴容線程啟動擴容,失敗則重新計算count并進行擴容判定。
如果已處于擴容過程,則判斷是否還要寫輔助擴容線程,滿足以下幾個條件則:1.此次擴容的容量與當(dāng)前正在進行的容量一致2.擴容線程未達到最大額定值3.不是處于擴容收尾階段(只剩下一個輔助擴容線程,nextTable
已經(jīng)為空,擴容下標(biāo)已為0無法繼續(xù)分配)則使用CAS修改擴容特征值成功后當(dāng)前線程作為輔助擴容線程加入擴容任務(wù),不滿足條件則重新計算count并自旋進行擴容判定。
7 進行擴容-transfer
ConcurrentHashMap的實際擴容操作通過transfer來完成:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// CPU大于1的話 步長就是0.25n/CPU數(shù) 否則就是n 算出來的步長小于最小步長的話 就按最小步長來
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating // nextTab == null 說明是第一個來擴容的線程
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 建立一個二倍長度的數(shù)組,然后賦值給nextTab
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE; // 異常的話就是OOM? 唯一的異常也就是n過大?
return;
}
nextTable = nextTab; // 這個賦值為什么不做同步呢?不怕并發(fā)問題嗎,哦 不會 因為調(diào)用這個方法的地方已經(jīng)確定了,只會有一個初始線程過來
transferIndex = n; // n就是新的長度
}
int nextn = nextTab.length; // 新容量
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 創(chuàng)建一個中繼節(jié)點
boolean advance = true; //一個終止的標(biāo)記
boolean finishing = false; // to ensure sweep before committing nextTab // 是否已經(jīng)處于收尾掃描提交階段
for (int i = 0, bound = 0;;) { // 一直循環(huán)著按步長擴容
Node<K,V> f; int fh;
while (advance) { // 計算出bound和i 賦值TRANSFERINDEX
int nextIndex, nextBound;
if (--i >= bound || finishing) // i大于bound,說明什么呢,說明i已經(jīng)在區(qū)間內(nèi)了
advance = false;
else if ((nextIndex = transferIndex) <= 0) { 如果下一次的上邊界已經(jīng)到0了 也停止
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ? //nextBound作為新的上邊界
nextIndex - stride : 0))) {
bound = nextBound; //nextBound作為這次的下邊界
i = nextIndex - 1; // 從nextIndex(上邊界) - 1開始依次遞減到bound區(qū)域
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) { // 如果i不在0-n范圍內(nèi),已經(jīng)越界
int sc;
if (finishing) { // 如果已經(jīng)在收尾階段
nextTable = null; // 重置nextTable
table = nextTab; // 容器確認
sizeCtl = (n << 1) - (n >>> 1); // 2n-0.5n = 0.75*2n 就是新的擴容閾值
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // sc減1 意味當(dāng)前擴容線程結(jié)束
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 如果sc-2和容量特征值一樣時,就說明現(xiàn)在是最后的一個擴容線程了
return; // 直接返回
finishing = advance = true; // 不是最后的話 設(shè)置為收尾階段?,確實,進入這里面已經(jīng)越界了...
i = n; // recheck before commit // 提交前的重復(fù)檢查?
}
}
else if ((f = tabAt(tab, i)) == null) // 如果i處還沒有使用
advance = casTabAt(tab, i, null, fwd); // cas設(shè)置i處的新fwdnode,成功還是失敗會記錄在advance上面
else if ((fh = f.hash) == MOVED) // 如果這節(jié)點已經(jīng)是一個中繼節(jié)點 說明什么呢?說明已經(jīng)有人處理過了上一步
advance = true; // already processed
else { // 如果有值,并且不是中繼節(jié)點的話
synchronized (f) { // 同步處理f,所以只會有一個線程作為參與者
if (tabAt(tab, i) == f) { // 同步之后還要再判斷一下 萬一f被并發(fā)處理掉了呢
Node<K,V> ln, hn;
if (fh >= 0) { // fh>=0說明是一個還沒樹化的鏈表?
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) { // 這是找最后一段的開頭節(jié)點
runBit = b;
lastRun = p;
}
}
if (runBit == 0) { // 最后一段的開頭節(jié)點是低位的話
ln = lastRun; // 開頭節(jié)點賦值到ln 也就是說從lastRun開始,之后的都沒有變化了,都是在同一位
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) { // 只需遍歷到lastRun即可
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln); // 往前補
else
hn = new Node<K,V>(ph, pk, pv, hn); // 往前補
}
setTabAt(nextTab, i, ln); // 賦值新容器
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd); // 舊容器節(jié)點變?yōu)橹欣^節(jié)點,但是如果在這一步之前就有新元素來了怎么辦,不會來,因為新增也是sync(f)的
advance = true;
}
else if (f instanceof TreeBin) { // 如果是一個樹化節(jié)點
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : // 退化或者重新樹化
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
解析: transfer
采用分段擴容的方式,從n-1->0,每個線程每次會占用一個步長的區(qū)間,然后針對這個區(qū)間進行擴容,擴容完畢再去占用下一個區(qū)間,直到無法占用新的區(qū)間則結(jié)束擴容流程。
步長的計算與應(yīng)用的CPU數(shù)有關(guān),如果當(dāng)前應(yīng)用CPU數(shù)為1則步長為n,即單線程庫擴容。如果當(dāng)前應(yīng)用CPU數(shù)不為1,則為0.25n/CPU,但是如果算出的步長小于最小MIN_TRANSFER_STRIDE(16),則以MIN_TRANSFER_STRIDE為步長。
接下來通過nextTable
是否為空判斷是否為初始擴容線程,如果是的話就創(chuàng)建一個長度為2n的新容器并記錄到nextTable屬性,所有擴容線程共同使用nextTable作為新容器,同時記錄當(dāng)前擴容進度下標(biāo)transferIndex
為n(擴容占用是從n到0的,因此初始未占用時就是n)。這里雖然沒進行加鎖但是依然不會有并發(fā)問題,因為nextTable為空的只有初始線程,而初始線程的創(chuàng)建則是線程安全的。
步長確認完畢,新容器創(chuàng)建完畢,擴容進度下標(biāo)初始化完成,接下來就開始進行區(qū)間占用了:使用自旋+CAS來進行線程安全的區(qū)間占用,advance
作為自旋結(jié)束的條件:如果i大于bound意味著本次占用的區(qū)間還沒處理完則結(jié)束自旋。finishing
意味著擴容已經(jīng)進入尾聲了,也無需再加入擴容,結(jié)束自旋。擴容進度下標(biāo)transferIndex
小于等于0說明所有的區(qū)間都已經(jīng)被占用了也無需再加入擴容,結(jié)束自旋。如果以上條件不成立,則使用CAS修改擴容進度下標(biāo)transferIndex
嘗試占用一個步長的區(qū)間,如果失敗則自旋繼續(xù)判定區(qū)間占用,修改成功則意味著區(qū)間占用成功,賦值本次區(qū)間的擴容游標(biāo)i以及下標(biāo)邊界bound
。
區(qū)間占用成功后開始從i到bound
的擴容階段,首先需要判斷i的合法性(需要在i和n之間,即判斷是不是已擴容完畢,如–i的循環(huán)會使得i為負數(shù)表示擴容完畢),如果沒占據(jù)到區(qū)間則判斷是不是處于于finishing
階段,處于的話說明自己以及是最后一個擴容線程了,將新容器nextTable
賦值為table,并設(shè)置新的擴容閾值。如果不是處于finishing
階段則因為當(dāng)前線程沒占據(jù)到區(qū)間需要退出擴容,那么響應(yīng)的擴容特征值也要相應(yīng)的-1來完成擴容線程量特征值的維護,如果擴容特征值維護失敗則需要繼續(xù)自旋嘗試退出,如果維護成功則判斷自己退出之后是不是只能先一個擴容線程,因為起始擴容線程的線程擴容量特征值為+2,所以通過-2來判斷,如果自己退出之后僅剩一個線程在擴容,那么就把finishing
設(shè)置為true,以使其在擴容工作結(jié)束后進行新容器的賦值以及新擴容閾值的維護。
如果i處于合法范圍內(nèi),說明處于正常擴容中,那么獲取i處的節(jié)點,節(jié)點為空則利用一個CAS為其賦值一個ForwardingNode
節(jié)點,這個節(jié)點是一個中繼節(jié)點,意味著這個節(jié)點正處于擴容狀態(tài),其hash值為特殊值MOVED(-1),在5 新數(shù)據(jù)載入-putVal 的過程中如果發(fā)現(xiàn)目標(biāo)節(jié)點hash值為MOVED,那么putVal線程就會暫停put操作,作為輔助擴容線程先行擴容容器,擴容完畢后再進行put操作。
如果i處節(jié)點不會空但是hash值已經(jīng)是MOVED,那么說明節(jié)點已經(jīng)處于遷移狀態(tài)則跳過。
如果以上均不符合,說明i合法,且i處擁有實際的數(shù)據(jù)節(jié)點,那么使用i處的節(jié)點f作為鎖通過synchronized來達成線程安全的擴容,因為putVal過程中也是用i處的節(jié)點f作為鎖進行synchronized的,意味著對i處的操作擴容和賦值只有一個過程能操作,以此來保證putVal和transfer的并發(fā)安全性。synchronized內(nèi)部的擴容過程因為已經(jīng)保證了線程同步,因此和HashMap的擴容過程區(qū)別并不大,這里不再描述,感興趣的話可以查看9: 從源碼看HashMap:一文看懂HashMap了解。
8 輔助擴容-helpTransfer
在5 新數(shù)據(jù)載入-putVal 中以及7 進行擴容-transfer中都提到過如果putVal過程中發(fā)現(xiàn)目標(biāo)節(jié)點是一個中繼節(jié)點ForwardingNode
(hash值為特殊值MOVED)那么putVal會暫停put操作,通過helpTransfer
方法進行輔助擴容:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
解析: 這個方法是不是看起來很熟悉,是的,除了ForwardingNode
判斷,其主要邏輯和6 維護與啟動擴容-addCount判定擴容時的邏輯大致相當(dāng)。因此也不在額外表述。這里貼出來的目的僅僅是為了完整的體現(xiàn)putVal遭遇擴容并發(fā)的處理過程。
以上就是java核心基礎(chǔ)之從源碼看ConcurrentHashMap的詳細內(nèi)容,更多關(guān)于JAVA核心知識之從源碼看ConcurrentHashMap的資料請關(guān)注W3Cschool其它相關(guān)文章!