App下載

java核心基礎(chǔ) ConcurrentHashMap源碼分析

猿友 2021-07-19 15:47:02 瀏覽數(shù) (1882)
反饋

HashMap 作為我們平日里 java 開(kāi)發(fā)使用最多的集合,本篇文章要介紹的 ConcurrentHashMap 是前者的升級(jí),也許有些小伙伴有聽(tīng)說(shuō)過(guò)。本篇文章將帶大家簡(jiǎn)單地了解一下 java 的核心基礎(chǔ) ConcurrentHashMap 的知識(shí)內(nèi)容。

1 前言

ConcurrentHashMap是基于Hash表的Map接口實(shí)現(xiàn),鍵與值均不允許為NULL,他是一個(gè)線程安全的Map。同時(shí)他也是一個(gè)無(wú)序的Map,不同時(shí)間進(jìn)行遍歷可能會(huì)得到不同的順序。在JDK1.8之前,ConcurrentHashMap使用分段鎖以在保證線程安全的同時(shí)獲得更大的效率。JDK1.8開(kāi)始舍棄了分段鎖,使用自旋+CAS+sync關(guān)鍵字來(lái)實(shí)現(xiàn)同步。本文所述便是基于JDK1.8。
ConcurrentHashMap與HashMap有共同之處,一些HashMap的基本概念與實(shí)現(xiàn),本文不再贅述。

2 繼承關(guān)系

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable

可以看到ConcurrentHashMap繼承了AbstractMap及ConcurrentMap抽象類(lèi),并實(shí)現(xiàn)了Serializable接口,這說(shuō)明ConcurrentHashMap是一個(gè)線程安全的標(biāo)準(zhǔn)Map,且允許序列化。與HashMap不同是ConcurrentHashMap不允許Clone。

3 構(gòu)造方法

ConcurrentHashMap同樣是采用懶初始化的方式,有實(shí)際元素時(shí)才進(jìn)行容器的初始化。因此其構(gòu)造方法與HashMap相差無(wú)幾。

// 無(wú)參構(gòu)造
public ConcurrentHashMap() {
}
// 帶有初始容量的構(gòu)造
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); //求最小二次冪
    this.sizeCtl = cap; // sizeCtl 暫存容量
}
// 帶有初始集合的構(gòu)造
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}
// 指定初始容量和裝載因子的構(gòu)造
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

以上構(gòu)造并沒(méi)有什么特別的,邏輯也相對(duì)簡(jiǎn)單,不再詳細(xì)解析,感興趣的話可以到前言提到的HashMap篇了解。
除此之外,ConcurrentHashMap擁有另外一個(gè)值得注意的構(gòu)造方法:
指定初始容量,裝載因子以及并發(fā)級(jí)別的構(gòu)造方法:
源碼:

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 基礎(chǔ)校驗(yàn)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // 取初始容量和并發(fā)級(jí)別的最大值
        initialCapacity = concurrencyLevel;  
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);  // 指定了裝載因子,因此就用初始容量和裝載因子計(jì)算出實(shí)際要的初始容量
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);	// 確定計(jì)算出初始容量,
    this.sizeCtl = cap;
}

解析: 除了初始容量與裝載因子外,此構(gòu)造方法還有一個(gè)并發(fā)級(jí)別concurrencyLevel的參數(shù)。在jdk1.7時(shí),并發(fā)級(jí)別作為分段鎖的標(biāo)準(zhǔn)進(jìn)行分段。但是jdk1.8開(kāi)始舍棄了分段鎖,為了版本兼容,此構(gòu)造方法依然存在,但是concurrencyLevel也不再具有其分段依據(jù)的意義,而是作為初始容量的定義依據(jù)。

4 初始化

ConcurrentHashMap采用懶初始化的方式,在第一次putAvl時(shí)如果容器為空,則會(huì)調(diào)用initTable()進(jìn)行容器的初始化:

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {    // tab為空時(shí)就一直循環(huán)
        if ((sc = sizeCtl) < 0)       // sc小于0說(shuō)明正在初始化或者正在復(fù)制,放棄CPU等下一次
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    // 否則的話就用CAS把sizectl設(shè)置成-1,標(biāo)識(shí)正在初始化,且成功的話
            try {
			// CAS之后再判斷一下,這是為了避免并發(fā),比如上面判斷了tab為空,然后另一個(gè)線程做了初始化操作,結(jié)束時(shí)sc被設(shè)置為擴(kuò)容閾值,然后繼續(xù)(sc = sizeCtl) < 0,這時(shí)cs還是大于0,所以還是能走進(jìn)來(lái)的,所以這里再判斷一下
                if ((tab = table) == null || tab.length == 0) {   
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;     // sc最初始是存在的初始容量的
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];   // 建立一個(gè)容器
                    table = tab = nt; // 然后復(fù)制到table屬性上
                    sc = n - (n >>> 2);   // sc等于 n>>>2就是n/4就是0.25n   sc = 0.75n,相當(dāng)于設(shè)置擴(kuò)容閾值
                }
            } finally {
                sizeCtl = sc; // sizeCtl賦值成擴(kuò)容閾值
            }
            break;    // 只要走到這里面,那么說(shuō)明一定已經(jīng)初始化結(jié)束了,無(wú)論初始化是自己做的還是其他線程做的,然后就跳出
        }
    }
    return tab;   // 返回新的tab
}

解析: 容器初始化完成之前不斷的進(jìn)行循環(huán)。這是因?yàn)镃oncurrentHashMap是一個(gè)支持并發(fā)的Map,可能同時(shí)會(huì)有多個(gè)線程進(jìn)入initTable方法,但是只有一個(gè)線程執(zhí)行初始化操作,那么剩下的線程就需要等待初始化完成再跳出initTable方法,以滿足接下來(lái)的putVal操作。
為了保證只有一個(gè)線程執(zhí)行初始化操作,使用sizeCtl來(lái)作為標(biāo)識(shí),sizeCtl為-1時(shí)即說(shuō)明當(dāng)前已有線程正在初始化,則放棄CPU繼續(xù)循環(huán)。sizeCtl不為負(fù)數(shù)時(shí),則使用CAS(通過(guò)Unsafe+偏移量的方式實(shí)現(xiàn))將sizeCtl置為-1,如果當(dāng)前線程成功的話則進(jìn)入實(shí)際的擴(kuò)容操作。
通過(guò)CAS鎖定成功后再次判斷容器是否為空,這是為了避免并發(fā),比如上面判斷了tab為空,然后另一個(gè)線程做了初始化操作,結(jié)束時(shí)sc被設(shè)置為擴(kuò)容閾值,然后繼續(xù)(sc = sizeCtl) < 0,這時(shí)cs還是大于0,所以還是能走進(jìn)來(lái)的,所以這里再判斷一下。
如果sc(sizeCtl原值)大于0,則以sc做為初始容量。這個(gè)在構(gòu)造方法篇提到過(guò),對(duì)于有初始容量要求的構(gòu)造,會(huì)以sizeCtl暫存初始容量。否則的話就取默認(rèn)的初始容量DEFAULT_CAPACITY(16)。
接下來(lái)就建立目標(biāo)容量的容器并賦值給容器屬性table,計(jì)算該容量下的擴(kuò)容閾值賦值給sizeCtlsizeCtl的賦值放到finally里面的原因是因?yàn)闊o(wú)論容器創(chuàng)建成功還是失敗,都需要放開(kāi)以sizeCtl為負(fù)值作為判斷條件的鎖,以保證在本線程創(chuàng)建失敗的情況其它線程能繼續(xù)競(jìng)爭(zhēng)鎖繼續(xù)進(jìn)行容器創(chuàng)建的工作。
至此,容器的初始化便完成了。

5 新數(shù)據(jù)載入-putVal

ConcurrentHashMap新數(shù)據(jù)載入主要通過(guò)putVal實(shí)現(xiàn):

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) { // 會(huì)一直循環(huán),直到break
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();  // 初始化table
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {    // unsafe獲取i處的數(shù)據(jù),如果為null
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))    // cas操作,如果i處為null就構(gòu)造新元素,成功就break  失敗就繼續(xù)循環(huán)
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)        // f處的元素hash是-1   -1意味著這是一個(gè)遷移節(jié)點(diǎn)
            tab = helpTransfer(tab, f);   // 輔助遷移開(kāi)始
        else {
            V oldVal = null;
            synchronized (f) {    // 用f進(jìn)行同步
                if (tabAt(tab, i) == f) { // 這里要保證i處還是f,而不是已經(jīng)被其它元素并發(fā)占據(jù)了
                    if (fh >= 0) {    // f處的hash要大于等于0,這是為什么呢?除了MOVED還會(huì)有其它負(fù)值嗎
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {  // 從e往后找,并且記錄元素?cái)?shù)量
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) { // 一系列判斷Key是否相等,如果相等
                                oldVal = e.val;   // 記錄舊值
                                if (!onlyIfAbsent)    // 不是僅允許新增的話
                                    e.val = value;    // 記錄新值
                                break;    // 成功的話就跳出大循環(huán)
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {   // e向后移動(dòng),  如果移動(dòng)之后是nul,就是在末尾的話
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);   // 建立一個(gè)新節(jié)點(diǎn)
                                break;     // 成功的話就跳出大循環(huán)
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {  // 如果是一個(gè)樹(shù)化節(jié)點(diǎn)
                        Node<K,V> p;
                        binCount = 2; // 樹(shù)化的節(jié)點(diǎn)的話固定元素?cái)?shù)量為2,這是一個(gè)相對(duì)特殊的值,即會(huì)擴(kuò)容又不會(huì)重復(fù)樹(shù)化
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) { // 往樹(shù)里面新增元素,如果存在就返回那個(gè)節(jié)點(diǎn),不存在就是新增節(jié)點(diǎn)
                            oldVal = p.val;   // 記錄舊值
                            if (!onlyIfAbsent)    // 不是僅允許新增的話
                                p.val = value; // 記錄新值
                        }
                    }
                }
            }
            if (binCount != 0) {  // 如果元素?cái)?shù)不為0的話
                if (binCount >= TREEIFY_THRESHOLD)    // 判斷是否到了樹(shù)化閾值
                    treeifyBin(tab, i);   // 到了的話進(jìn)行樹(shù)化
                if (oldVal != null)   // 覆蓋操作直接返回舊值不會(huì)執(zhí)行addCount
                    return oldVal;
                break;     // binCount != 0才跳出大循環(huán),為0是tabAt(tab, i) != f導(dǎo)致的,即i處已經(jīng)被其他線程覆蓋了
            }
        }
    }
    addCount(1L, binCount);   // 
    return null;
}

解析: 完成為空校驗(yàn)后,通過(guò)spread方法來(lái)計(jì)算出hash以確定下標(biāo)位置,spread的計(jì)算方式為(h ^ (h >>> 16)) & HASH_BITS,HASH_BITS的值為0x7fffffff,描述出來(lái)就是將hashCode的高16位和低16位做異或操作,并保證最高位符號(hào)位為0(結(jié)果是一個(gè)正數(shù)),注解表示這樣做的原因是為了使數(shù)據(jù)更加分散,盡可能的避免hash沖突。
如果容器tab為空或者長(zhǎng)度為0說(shuō)明容器未初始化,那么就調(diào)用initTable進(jìn)行容器初始化。
否則的話對(duì)hash與現(xiàn)容量進(jìn)行與運(yùn)算得出數(shù)據(jù)應(yīng)處的下標(biāo),判斷此下標(biāo)所在處節(jié)點(diǎn)是否為空,為空說(shuō)明沒(méi)有元素,直接創(chuàng)建一個(gè)新節(jié)點(diǎn)作為頭元素放進(jìn)去,這一步通過(guò)CAS操作實(shí)現(xiàn),避免并發(fā)情況下另外的線程先一步完成頭節(jié)點(diǎn)創(chuàng)建操作。
如果下標(biāo)所在處節(jié)點(diǎn)不為空,說(shuō)明該處已經(jīng)有元素了,此時(shí)判斷頭節(jié)點(diǎn)的hash是否為MOVED,MOVED的值為-1。上面提到過(guò),正常元素通過(guò)spread方法計(jì)算出來(lái)的hash值都會(huì)使正數(shù)。此處-1為一個(gè)特殊值,意味著此節(jié)點(diǎn)正在進(jìn)行擴(kuò)容遷移工作,那么此時(shí)就調(diào)用8 輔助擴(kuò)容-helpTransfer方法進(jìn)行輔助遷移工作。
如果節(jié)點(diǎn)hash不為MOVED,意味著這是一個(gè)正常節(jié)點(diǎn),就執(zhí)行元素載入工作,使用synchronized關(guān)鍵字實(shí)現(xiàn)同步, 同步塊內(nèi)開(kāi)始還要使用tabAt(tab, i) == f判斷進(jìn)入同步后i處的節(jié)點(diǎn)還是原節(jié)點(diǎn),接下來(lái)就是元素的載入工作了,整體和HashMap的流程是相差無(wú)幾的,感興趣的話可以去文首提到的解析HashMap的文章了解一下。synchronized塊將元素載入節(jié)點(diǎn)后,接著會(huì)對(duì)binCount進(jìn)行判斷,binCount在節(jié)點(diǎn)還處于鏈表模式下的情況下記錄節(jié)點(diǎn)在新元素載入后的元素總數(shù)量,此處判斷binCount達(dá)到TREEIFY_THRESHOLD(8) 樹(shù)化閾值后,就會(huì)對(duì)該節(jié)點(diǎn)進(jìn)行樹(shù)化操作。樹(shù)化操作依然是通過(guò)synchronized來(lái)完成的,這里不做過(guò)多延伸。
以上整個(gè)新元素載入操作都是一個(gè)自旋的過(guò)程,一直到新元素載入成功后通過(guò)break跳出自旋過(guò)程。
新元素載入節(jié)點(diǎn)后,需要對(duì)count實(shí)時(shí)維護(hù),count通過(guò) 6 維護(hù)與啟動(dòng)擴(kuò)容-addCount 方法完成同步維護(hù)工作。

6 維護(hù)與啟動(dòng)擴(kuò)容-addCount

ConcurrentHashMap通過(guò)addCount方法實(shí)時(shí)維護(hù)內(nèi)部元素?cái)?shù)量,并在達(dá)到擴(kuò)容閾值的情況下啟動(dòng)擴(kuò)容操作:

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;  
    if ((as = counterCells) != null ||    // counterCells不為空,如果沒(méi)經(jīng)歷過(guò)并發(fā)肯定是空的,這個(gè)判斷意思是維護(hù)count的過(guò)程中已經(jīng)經(jīng)歷過(guò)并發(fā),已經(jīng)處于不使用baseCount維護(hù)的階段了
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {       //這個(gè)比較唯一會(huì)出現(xiàn)false的情況就是b = baseCount執(zhí)行之后,還沒(méi)進(jìn)行cas時(shí),baseCount又被改了。意思是如果沒(méi)有并發(fā)就用baseCount維護(hù),并發(fā)導(dǎo)致baseCount維護(hù)失敗就進(jìn)入counterCells維護(hù)
        CounterCell a; long v; int m;
        boolean uncontended = true;   // 記錄是否遭遇了并發(fā)
        if (as == null || (m = as.length - 1) < 0 ||      // as等于null意味著baseCount維護(hù)遭遇并發(fā)了,且是初次(非初次counterCells不會(huì)為null)  (m = as.length - 1) < 0說(shuō)明counterCells長(zhǎng)度為0,即沒(méi)有實(shí)際內(nèi)容
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||     // counterCells是一個(gè)分段計(jì)算的概念,這里就是隨機(jī)找一個(gè)槽,如果這個(gè)槽位空的話
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {      // 槽不為空的話就用cas把槽里面的值賦值給v+a.val    如果失敗的話
            fullAddCount(x, uncontended); //  用fullAddCount  就是LongAdder的模式將x放進(jìn)去,初始化counterCells
            return;
        }
        if (check <= 1)       // check小于等于1(1表示是當(dāng)前節(jié)點(diǎn)的第一個(gè)元素,-1標(biāo)識(shí)是replace方法或者clear過(guò)來(lái)的,這些都不需要擴(kuò)容)  
            return;
        s = sumCount();   // 計(jì)算一下現(xiàn)在的總量
    }
    if (check >= 0) {  //baseCount成功的情況且 check大于0,即不是replace或者clear方法進(jìn)來(lái)的
        Node<K,V>[] tab, nt; int n, sc;
        /**
        1.s >= (long)(sc = sizeCtl)  s是總數(shù)量  s大于等于擴(kuò)容閾值sizeCtl時(shí),或者sizeCtl為負(fù)數(shù),正在擴(kuò)容時(shí)
        2.(tab = table) != null 并且容器tab不為空,意思就是表已經(jīng)初始化完畢了,是執(zhí)行擴(kuò)容而不是執(zhí)行初始化
        3. (n = tab.length) < MAXIMUM_CAPACITY)  并且容量還沒(méi)有達(dá)到最大容量時(shí)
         */
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);  // 獲得容量特征值:容量n的左0數(shù)量并將高位置1
            if (sc < 0) { // 如果sc小于0  意味這個(gè)正在進(jìn)行并發(fā)擴(kuò)容
                /**
                sc >>> RESIZE_STAMP_SHIFT != rs是判斷現(xiàn)在的容量與sc中記錄的長(zhǎng)度是不是不一樣長(zhǎng),不一樣長(zhǎng)的話說(shuō)明不是同一長(zhǎng)度的擴(kuò)容,就要break-保證是同等長(zhǎng)度的擴(kuò)容
                sc == rs + 1 這個(gè)地方在看來(lái)是一個(gè)bug,如果已經(jīng)處于擴(kuò)容中,sc目前處于高16位是容量特征值,低16位標(biāo)識(shí)擴(kuò)容線程量特征值的狀態(tài),而此時(shí)的rs是容量特征值的狀態(tài),也就是目前sc的高16位,這里原意應(yīng)該是判斷目前是不是只剩下一個(gè)非初始擴(kuò)容線程
                             ,為什么這么說(shuō)呢,因?yàn)槌跏紨U(kuò)容線程會(huì)把sc設(shè)置稱高16位是容量特征值即rs,低16位標(biāo)識(shí)擴(kuò)容線程量特征值(初始擴(kuò)容線程+2,非初始擴(kuò)容線程+1),這一步判斷的就是如過(guò)只剩下一個(gè)非初始擴(kuò)容線程了,說(shuō)明初始擴(kuò)容線程已經(jīng)結(jié)束,現(xiàn)在已經(jīng)進(jìn)入擴(kuò)容的最后階段了,就不需要再繼續(xù)進(jìn)行輔助擴(kuò)容了
                            ,正確的方式應(yīng)該是sc == (rs << RESIZE_STAMP_SHIFT) + 1
                sc == rs + MAX_RESIZERS  和上面+1一樣,不過(guò)這個(gè)是判斷參與resize的線程是否已經(jīng)達(dá)到所允許的最大值 ,看起來(lái)也是一個(gè)bug,正確的應(yīng)是sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
                (nt = nextTable) == null  nextTable為空說(shuō)明已經(jīng)擴(kuò)容完畢了,不再需要輔助擴(kuò)容了
                transferIndex <= 0   // 判斷下邊界是不是已經(jīng)到0了,擴(kuò)容是根據(jù)CPU算出步長(zhǎng),一個(gè)擴(kuò)容線程擴(kuò)容一段,從高到低分配,transferIndex記錄的就行目前分配到哪個(gè)下標(biāo)了,如果這個(gè)下標(biāo)小于等于0說(shuō)明整個(gè)容器已經(jīng)分配完了,就不需要在輔助擴(kuò)容了
                 */
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))       // 如果是輔助擴(kuò)容的sc已經(jīng)處于高16位是容量特征值,低16位標(biāo)識(shí)擴(kuò)容線程量特征值的狀態(tài)了+1讓地位的擴(kuò)容線程量特征值再+1
                    transfer(tab, nt);    // 進(jìn)行擴(kuò)容
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2)) // 初始擴(kuò)容現(xiàn)在是+2 執(zhí)行完這一步sc的值高16位是容量特征值,低16位標(biāo)識(shí)擴(kuò)容線程量特征值,因?yàn)閞esizeStamp時(shí)將高位置為1是的其又是一個(gè)負(fù)數(shù),表示正在擴(kuò)容
                transfer(tab, null);
            s = sumCount(); // 計(jì)算總數(shù)
        } // 一直循環(huán)擴(kuò)容
    }
}

解析: count的維護(hù)有兩種方式:未產(chǎn)生并發(fā)場(chǎng)景時(shí)通過(guò)baseCount維護(hù),經(jīng)歷過(guò)并發(fā)場(chǎng)景后轉(zhuǎn)變?yōu)橥ㄟ^(guò)counterCells維護(hù)。baseCount模式為直接使用int進(jìn)行加減運(yùn)算,cas保證同步,counterCells模式類(lèi)似于1.8之前的ConcurrentHashMap,采用一個(gè)分段的概念,運(yùn)算時(shí)隨機(jī)找一個(gè)槽,通過(guò)cas保證一個(gè)槽的值加算同步,count即為所以槽的加和(使用LongAdder實(shí)現(xiàn))。
addCount方法初始即通過(guò)counterCells是否為空(為空說(shuō)明未經(jīng)歷初始化,使用的baseCount模式)來(lái)判斷當(dāng)前是否為counterCells模式,如果處于counterCells模式則進(jìn)入counterCells模式計(jì)算邏輯,否則的話使用baseCount模式來(lái)維護(hù)count并記錄為s,如果遭遇并發(fā)導(dǎo)致維護(hù)失敗,則轉(zhuǎn)換為counterCells模式進(jìn)入counterCells模式計(jì)算邏輯。
進(jìn)入counterCells模式,首先看counterCells是否已完成初始化,若為初始化進(jìn)入fullAddCount,已初始化的話則隨機(jī)找到counterCells中的一個(gè)槽位,如果這個(gè)槽位未初始化則進(jìn)入fullAddCount,如果已初始化則對(duì)這個(gè)操作進(jìn)行CAS的加操作來(lái)維護(hù)count,CAS成功則維護(hù)完成,并發(fā)導(dǎo)致CAS維護(hù)失敗則進(jìn)入fullAddCount。fullAddCount有點(diǎn)類(lèi)似于CMS的full GC退化機(jī)制,會(huì)完成counterCells的初始化以及并發(fā)沖突場(chǎng)景下同步完成count維護(hù)。counterCells最后則判斷check是否小于等于1(1表示是當(dāng)前節(jié)點(diǎn)的第一個(gè)元素,-1標(biāo)識(shí)是replace方法或者clear過(guò)來(lái)的,這些都不需要擴(kuò)容)則直接結(jié)束,不進(jìn)行擴(kuò)容判定,否則的話通過(guò)sumCount獲得當(dāng)前count并記錄為s以準(zhǔn)備后續(xù)的擴(kuò)容判定。
check大于等于0(即不是replace或者clear方法引起的count改變)時(shí)進(jìn)行擴(kuò)容判定,執(zhí)行擴(kuò)容需要滿足三個(gè)條件:1.count大于擴(kuò)容閾值或者當(dāng)前正在執(zhí)行擴(kuò)容操作2.容器已初始化完畢3.容量未達(dá)到最大容量。滿足以上條件則需判定當(dāng)前是否正在擴(kuò)容,因?yàn)閿U(kuò)容時(shí)會(huì)將SIZECTL設(shè)置為一個(gè)特征值,這個(gè)特征值為負(fù)值,因此通過(guò)sc是否小于0來(lái)判定當(dāng)前是否處于擴(kuò)容狀態(tài)。接下來(lái)使用resizeStamp獲得容量特征值(一個(gè)表示容量n的左0數(shù)量并將高位置1的值),如果當(dāng)前未處于擴(kuò)容過(guò)程,則通過(guò)容量特征值獲得擴(kuò)容特征值(高16位為容量特征值,低16位為擴(kuò)容線程量特征值)并通過(guò)CAS修改SIZECTL為擴(kuò)容特征值,成功則當(dāng)前線程作為第一個(gè)擴(kuò)容線程啟動(dòng)擴(kuò)容,失敗則重新計(jì)算count并進(jìn)行擴(kuò)容判定。
如果已處于擴(kuò)容過(guò)程,則判斷是否還要寫(xiě)輔助擴(kuò)容線程,滿足以下幾個(gè)條件則:1.此次擴(kuò)容的容量與當(dāng)前正在進(jìn)行的容量一致2.擴(kuò)容線程未達(dá)到最大額定值3.不是處于擴(kuò)容收尾階段(只剩下一個(gè)輔助擴(kuò)容線程,nextTable已經(jīng)為空,擴(kuò)容下標(biāo)已為0無(wú)法繼續(xù)分配)則使用CAS修改擴(kuò)容特征值成功后當(dāng)前線程作為輔助擴(kuò)容線程加入擴(kuò)容任務(wù),不滿足條件則重新計(jì)算count并自旋進(jìn)行擴(kuò)容判定。

7 進(jìn)行擴(kuò)容-transfer

ConcurrentHashMap的實(shí)際擴(kuò)容操作通過(guò)transfer來(lái)完成:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
   // CPU大于1的話  步長(zhǎng)就是0.25n/CPU數(shù)    否則就是n     算出來(lái)的步長(zhǎng)小于最小步長(zhǎng)的話  就按最小步長(zhǎng)來(lái)
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating   // nextTab == null 說(shuō)明是第一個(gè)來(lái)擴(kuò)容的線程
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];  // 建立一個(gè)二倍長(zhǎng)度的數(shù)組,然后賦值給nextTab
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;      // 異常的話就是OOM?  唯一的異常也就是n過(guò)大?
            return;
        }
        nextTable = nextTab;  // 這個(gè)賦值為什么不做同步呢?不怕并發(fā)問(wèn)題嗎,哦  不會(huì)  因?yàn)檎{(diào)用這個(gè)方法的地方已經(jīng)確定了,只會(huì)有一個(gè)初始線程過(guò)來(lái)
        transferIndex = n;    //   n就是新的長(zhǎng)度   
    }
    int nextn = nextTab.length;   // 新容量
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);   // 創(chuàng)建一個(gè)中繼節(jié)點(diǎn)
    boolean advance = true;       //一個(gè)終止的標(biāo)記
    boolean finishing = false; // to ensure sweep before committing nextTab  // 是否已經(jīng)處于收尾掃描提交階段
    for (int i = 0, bound = 0;;) {    // 一直循環(huán)著按步長(zhǎng)擴(kuò)容
        Node<K,V> f; int fh;
        while (advance) {     //  計(jì)算出bound和i   賦值TRANSFERINDEX
            int nextIndex, nextBound;
            if (--i >= bound || finishing)        // i大于bound,說(shuō)明什么呢,說(shuō)明i已經(jīng)在區(qū)間內(nèi)了
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {      如果下一次的上邊界已經(jīng)到0了 也停止
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?           //nextBound作為新的上邊界
                                   nextIndex - stride : 0))) {
                bound = nextBound;        //nextBound作為這次的下邊界
                i = nextIndex - 1;    // 從nextIndex(上邊界) - 1開(kāi)始依次遞減到bound區(qū)域
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {  // 如果i不在0-n范圍內(nèi),已經(jīng)越界
            int sc;
            if (finishing) {  // 如果已經(jīng)在收尾階段
                nextTable = null; // 重置nextTable
                table = nextTab;  // 容器確認(rèn)
                sizeCtl = (n << 1) - (n >>> 1); // 2n-0.5n = 0.75*2n   就是新的擴(kuò)容閾值
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {   // sc減1  意味當(dāng)前擴(kuò)容線程結(jié)束
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)     // 如果sc-2和容量特征值一樣時(shí),就說(shuō)明現(xiàn)在是最后的一個(gè)擴(kuò)容線程了
                    return;   // 直接返回
                finishing = advance = true;   // 不是最后的話   設(shè)置為收尾階段?,確實(shí),進(jìn)入這里面已經(jīng)越界了...
                i = n; // recheck before commit           // 提交前的重復(fù)檢查?
            }
        }
        else if ((f = tabAt(tab, i)) == null)     // 如果i處還沒(méi)有使用
            advance = casTabAt(tab, i, null, fwd);    // cas設(shè)置i處的新fwdnode,成功還是失敗會(huì)記錄在advance上面
        else if ((fh = f.hash) == MOVED)  // 如果這節(jié)點(diǎn)已經(jīng)是一個(gè)中繼節(jié)點(diǎn)   說(shuō)明什么呢?說(shuō)明已經(jīng)有人處理過(guò)了上一步
            advance = true; // already processed
        else {    // 如果有值,并且不是中繼節(jié)點(diǎn)的話
            synchronized (f) {    // 同步處理f,所以只會(huì)有一個(gè)線程作為參與者
                if (tabAt(tab, i) == f) { // 同步之后還要再判斷一下  萬(wàn)一f被并發(fā)處理掉了呢
                    Node<K,V> ln, hn;
                    if (fh >= 0) {    // fh>=0說(shuō)明是一個(gè)還沒(méi)樹(shù)化的鏈表?
                        int runBit = fh & n;  
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {    // 這是找最后一段的開(kāi)頭節(jié)點(diǎn)
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {    // 最后一段的開(kāi)頭節(jié)點(diǎn)是低位的話
                            ln = lastRun; // 開(kāi)頭節(jié)點(diǎn)賦值到ln   也就是說(shuō)從lastRun開(kāi)始,之后的都沒(méi)有變化了,都是在同一位
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) { // 只需遍歷到lastRun即可
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);   // 往前補(bǔ)
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);   // 往前補(bǔ)
                        }
                        setTabAt(nextTab, i, ln); // 賦值新容器
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);    // 舊容器節(jié)點(diǎn)變?yōu)橹欣^節(jié)點(diǎn),但是如果在這一步之前就有新元素來(lái)了怎么辦,不會(huì)來(lái),因?yàn)樾略鲆彩莝ync(f)的
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {  // 如果是一個(gè)樹(shù)化節(jié)點(diǎn)
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :    // 退化或者重新樹(shù)化
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

解析: transfer采用分段擴(kuò)容的方式,從n-1->0,每個(gè)線程每次會(huì)占用一個(gè)步長(zhǎng)的區(qū)間,然后針對(duì)這個(gè)區(qū)間進(jìn)行擴(kuò)容,擴(kuò)容完畢再去占用下一個(gè)區(qū)間,直到無(wú)法占用新的區(qū)間則結(jié)束擴(kuò)容流程。
步長(zhǎng)的計(jì)算與應(yīng)用的CPU數(shù)有關(guān),如果當(dāng)前應(yīng)用CPU數(shù)為1則步長(zhǎng)為n,即單線程庫(kù)擴(kuò)容。如果當(dāng)前應(yīng)用CPU數(shù)不為1,則為0.25n/CPU,但是如果算出的步長(zhǎng)小于最小MIN_TRANSFER_STRIDE(16),則以MIN_TRANSFER_STRIDE為步長(zhǎng)。
接下來(lái)通過(guò)nextTable 是否為空判斷是否為初始擴(kuò)容線程,如果是的話就創(chuàng)建一個(gè)長(zhǎng)度為2n的新容器并記錄到nextTable屬性,所有擴(kuò)容線程共同使用nextTable作為新容器,同時(shí)記錄當(dāng)前擴(kuò)容進(jìn)度下標(biāo)transferIndex為n(擴(kuò)容占用是從n到0的,因此初始未占用時(shí)就是n)。這里雖然沒(méi)進(jìn)行加鎖但是依然不會(huì)有并發(fā)問(wèn)題,因?yàn)閚extTable為空的只有初始線程,而初始線程的創(chuàng)建則是線程安全的。
步長(zhǎng)確認(rèn)完畢,新容器創(chuàng)建完畢,擴(kuò)容進(jìn)度下標(biāo)初始化完成,接下來(lái)就開(kāi)始進(jìn)行區(qū)間占用了:使用自旋+CAS來(lái)進(jìn)行線程安全的區(qū)間占用,advance作為自旋結(jié)束的條件:如果i大于bound意味著本次占用的區(qū)間還沒(méi)處理完則結(jié)束自旋。finishing意味著擴(kuò)容已經(jīng)進(jìn)入尾聲了,也無(wú)需再加入擴(kuò)容,結(jié)束自旋。擴(kuò)容進(jìn)度下標(biāo)transferIndex小于等于0說(shuō)明所有的區(qū)間都已經(jīng)被占用了也無(wú)需再加入擴(kuò)容,結(jié)束自旋。如果以上條件不成立,則使用CAS修改擴(kuò)容進(jìn)度下標(biāo)transferIndex嘗試占用一個(gè)步長(zhǎng)的區(qū)間,如果失敗則自旋繼續(xù)判定區(qū)間占用,修改成功則意味著區(qū)間占用成功,賦值本次區(qū)間的擴(kuò)容游標(biāo)i以及下標(biāo)邊界bound
區(qū)間占用成功后開(kāi)始從i到bound的擴(kuò)容階段,首先需要判斷i的合法性(需要在i和n之間,即判斷是不是已擴(kuò)容完畢,如–i的循環(huán)會(huì)使得i為負(fù)數(shù)表示擴(kuò)容完畢),如果沒(méi)占據(jù)到區(qū)間則判斷是不是處于于finishing階段,處于的話說(shuō)明自己以及是最后一個(gè)擴(kuò)容線程了,將新容器nextTable賦值為table,并設(shè)置新的擴(kuò)容閾值。如果不是處于finishing階段則因?yàn)楫?dāng)前線程沒(méi)占據(jù)到區(qū)間需要退出擴(kuò)容,那么響應(yīng)的擴(kuò)容特征值也要相應(yīng)的-1來(lái)完成擴(kuò)容線程量特征值的維護(hù),如果擴(kuò)容特征值維護(hù)失敗則需要繼續(xù)自旋嘗試退出,如果維護(hù)成功則判斷自己退出之后是不是只能先一個(gè)擴(kuò)容線程,因?yàn)槠鹗紨U(kuò)容線程的線程擴(kuò)容量特征值為+2,所以通過(guò)-2來(lái)判斷,如果自己退出之后僅剩一個(gè)線程在擴(kuò)容,那么就把finishing設(shè)置為true,以使其在擴(kuò)容工作結(jié)束后進(jìn)行新容器的賦值以及新擴(kuò)容閾值的維護(hù)。
如果i處于合法范圍內(nèi),說(shuō)明處于正常擴(kuò)容中,那么獲取i處的節(jié)點(diǎn),節(jié)點(diǎn)為空則利用一個(gè)CAS為其賦值一個(gè)ForwardingNode節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)是一個(gè)中繼節(jié)點(diǎn),意味著這個(gè)節(jié)點(diǎn)正處于擴(kuò)容狀態(tài),其hash值為特殊值MOVED(-1),在5 新數(shù)據(jù)載入-putVal 的過(guò)程中如果發(fā)現(xiàn)目標(biāo)節(jié)點(diǎn)hash值為MOVED,那么putVal線程就會(huì)暫停put操作,作為輔助擴(kuò)容線程先行擴(kuò)容容器,擴(kuò)容完畢后再進(jìn)行put操作。
如果i處節(jié)點(diǎn)不會(huì)空但是hash值已經(jīng)是MOVED,那么說(shuō)明節(jié)點(diǎn)已經(jīng)處于遷移狀態(tài)則跳過(guò)。
如果以上均不符合,說(shuō)明i合法,且i處擁有實(shí)際的數(shù)據(jù)節(jié)點(diǎn),那么使用i處的節(jié)點(diǎn)f作為鎖通過(guò)synchronized來(lái)達(dá)成線程安全的擴(kuò)容,因?yàn)閜utVal過(guò)程中也是用i處的節(jié)點(diǎn)f作為鎖進(jìn)行synchronized的,意味著對(duì)i處的操作擴(kuò)容和賦值只有一個(gè)過(guò)程能操作,以此來(lái)保證putVal和transfer的并發(fā)安全性。synchronized內(nèi)部的擴(kuò)容過(guò)程因?yàn)橐呀?jīng)保證了線程同步,因此和HashMap的擴(kuò)容過(guò)程區(qū)別并不大,這里不再描述,感興趣的話可以查看9: 從源碼看HashMap:一文看懂HashMap了解。

8 輔助擴(kuò)容-helpTransfer

5 新數(shù)據(jù)載入-putVal 中以及7 進(jìn)行擴(kuò)容-transfer中都提到過(guò)如果putVal過(guò)程中發(fā)現(xiàn)目標(biāo)節(jié)點(diǎn)是一個(gè)中繼節(jié)點(diǎn)ForwardingNode(hash值為特殊值MOVED)那么putVal會(huì)暫停put操作,通過(guò)helpTransfer方法進(jìn)行輔助擴(kuò)容:

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
	Node<K,V>[] nextTab; int sc;
	if (tab != null && (f instanceof ForwardingNode) &&
		(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
		int rs = resizeStamp(tab.length);
		while (nextTab == nextTable && table == tab &&
			   (sc = sizeCtl) < 0) {
			if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
				sc == rs + MAX_RESIZERS || transferIndex <= 0)
				break;
			if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
				transfer(tab, nextTab);
				break;
			}
		}
		return nextTab;
	}
	return table;
}

解析: 這個(gè)方法是不是看起來(lái)很熟悉,是的,除了ForwardingNode判斷,其主要邏輯和6 維護(hù)與啟動(dòng)擴(kuò)容-addCount判定擴(kuò)容時(shí)的邏輯大致相當(dāng)。因此也不在額外表述。這里貼出來(lái)的目的僅僅是為了完整的體現(xiàn)putVal遭遇擴(kuò)容并發(fā)的處理過(guò)程。

以上就是java核心基礎(chǔ)之從源碼看ConcurrentHashMap的詳細(xì)內(nèi)容,更多關(guān)于JAVA核心知識(shí)之從源碼看ConcurrentHashMap的資料請(qǐng)關(guān)注W3Cschool其它相關(guān)文章!


0 人點(diǎn)贊