App下載

解析 Java 是如何通過AQS實現(xiàn)數(shù)據(jù)組織?

猿友 2021-07-20 11:39:06 瀏覽數(shù) (1907)
反饋

并發(fā)是Java語言中的一個很重要的概念,而說起并發(fā)就繞不過AQS。AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現(xiàn)都依賴于它。接下來將和大家簡單地介紹一下 AQS。

引言

從本篇文章開始,我們將介紹 Java AQS 的實現(xiàn)方式,本文先介紹 AQS 的內(nèi)部數(shù)據(jù)是如何組織的,后面的文章中再分別介紹 AQS 的各個部門實現(xiàn)。

AQS

通過前面的介紹,大家一定看出來了,上述的各種類型的鎖和一些線程控制接口(CountDownLatch 等),最終都是通過 AQS 來實現(xiàn)的,不同之處只在于 tryAcquire 等抽象函數(shù)如何實現(xiàn)。從這個角度來看,AQS(AbstractQueuedSynchronizer) 這個基類設(shè)計的真的很不錯,能夠包容各種同步控制方案,并提供了必須的下層依賴:比如阻塞,隊列等。接下來我們就來揭開它神秘的面紗。

內(nèi)部數(shù)據(jù)

AQS 顧名思義,就是通過隊列來組織修改互斥資源的請求。當(dāng)這個資源空閑時間,那么修改請求可以直接進(jìn)行,而當(dāng)這個資源處于鎖定狀態(tài)時,就需要等待,AQS 會將所有等待的請求維護(hù)在一個類似于 CLH 的隊列中。CLH:Craig、Landin and Hagersten隊列,是單向鏈表,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO),AQS是通過將每條請求共享資源的線程封裝成一個節(jié)點(diǎn)來實現(xiàn)鎖的分配。主要原理圖如下:

圖中的 state 是一個用 volatile 修飾的 int 變量,它的使用都是通過 CAS 來進(jìn)行的,而 FIFO 隊列完成請求排隊的工作,隊列的操作也是通過 CAS 來進(jìn)行的,正因如此該隊列的操作才能達(dá)到理想的性能要求。

通過 CAS 修改 state 比較容易,大家應(yīng)該都能理解,但是如果要通過 CAS 維護(hù)一個雙向隊列要怎么做呢?這里我們看一下 AQS 中 CLH 隊列的實現(xiàn)。在 AQS 中有兩個指針一個指針指向了隊列頭,一個指向了隊列尾。它們都是懶初始化的,也就是說最初都為null。

/**
 * Head of the wait queue, lazily initialized.  Except for
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be
 * CANCELLED.
 */
private transient volatile Node head;

/**
 * Tail of the wait queue, lazily initialized.  Modified only via
 * method enq to add new wait node.
 */
private transient volatile Node tail;

隊列中的每個節(jié)點(diǎn),都是一個 Node 實例,該實例的第一個關(guān)鍵字段是 waitState,它表述了當(dāng)前節(jié)點(diǎn)所處的狀態(tài),通過 CAS 進(jìn)行修改:

  • SIGNAL:表示當(dāng)前節(jié)點(diǎn)承擔(dān)喚醒后繼節(jié)點(diǎn)的責(zé)任
  • CANCELLED:表示當(dāng)前節(jié)點(diǎn)已經(jīng)超時或者被打斷
  • CONDITION:表示當(dāng)前節(jié)點(diǎn)正在 Condition 上等待(通過鎖可以創(chuàng)建 Condition 對象)
  • PROPAGATE:只會設(shè)置在 head 節(jié)點(diǎn)上,用于表明釋放共享鎖時,需要將這個行為傳播到其他節(jié)點(diǎn)上,這個我們稍后詳細(xì)介紹。
static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    /**
     * Status field, taking on only the values:
     *   SIGNAL:     The successor of this node is (or will soon be)
     *               blocked (via park), so the current node must
     *               unpark its successor when it releases or
     *               cancels. To avoid races, acquire methods must
     *               first indicate they need a signal,
     *               then retry the atomic acquire, and then,
     *               on failure, block.
     *   CANCELLED:  This node is cancelled due to timeout or interrupt.
     *               Nodes never leave this state. In particular,
     *               a thread with cancelled node never again blocks.
     *   CONDITION:  This node is currently on a condition queue.
     *               It will not be used as a sync queue node
     *               until transferred, at which time the status
     *               will be set to 0. (Use of this value here has
     *               nothing to do with the other uses of the
     *               field, but simplifies mechanics.)
     *   PROPAGATE:  A releaseShared should be propagated to other
     *               nodes. This is set (for head node only) in
     *               doReleaseShared to ensure propagation
     *               continues, even if other operations have
     *               since intervened.
     *   0:          None of the above
     *
     * The values are arranged numerically to simplify use.
     * Non-negative values mean that a node doesn't need to
     * signal. So, most code doesn't need to check for particular
     * values, just for sign.
     *
     * The field is initialized to 0 for normal sync nodes, and
     * CONDITION for condition nodes.  It is modified using CAS
     * (or when possible, unconditional volatile writes).
     */
    volatile int waitStatus;

    /**
     * Link to predecessor node that current node/thread relies on
     * for checking waitStatus. Assigned during enqueuing, and nulled
     * out (for sake of GC) only upon dequeuing.  Also, upon
     * cancellation of a predecessor, we short-circuit while
     * finding a non-cancelled one, which will always exist
     * because the head node is never cancelled: A node becomes
     * head only as a result of successful acquire. A
     * cancelled thread never succeeds in acquiring, and a thread only
     * cancels itself, not any other node.
     */
    volatile Node prev;

    /**
     * Link to the successor node that the current node/thread
     * unparks upon release. Assigned during enqueuing, adjusted
     * when bypassing cancelled predecessors, and nulled out (for
     * sake of GC) when dequeued.  The enq operation does not
     * assign next field of a predecessor until after attachment,
     * so seeing a null next field does not necessarily mean that
     * node is at end of queue. However, if a next field appears
     * to be null, we can scan prev's from the tail to
     * double-check.  The next field of cancelled nodes is set to
     * point to the node itself instead of null, to make life
     * easier for isOnSyncQueue.
     */
    volatile Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.  Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    //...
}

因為是雙向隊列,所以 Node 實例中勢必有 prev 和 next 指針,此外 Node 中還會保存與其對應(yīng)的線程。最后是 nextWaiter,當(dāng)一個節(jié)點(diǎn)對應(yīng)了共享請求時,nextWaiter 指向了 Node. SHARED 而當(dāng)一個節(jié)點(diǎn)是排他請求時,nextWaiter 默認(rèn)指向了 Node. EXCLUSIVE 也就是 null。我們知道 AQS 也提供了 Condition 功能,該功能就是通過 nextWaiter 來維護(hù)在 Condition 上等待的線程。也就是說這里的 nextWaiter 在鎖的實現(xiàn)部分中,扮演者共享鎖和排它鎖的標(biāo)志位,而在條件等待隊列中,充當(dāng)鏈表的 next 指針。

同步隊列

接下來,我們由最常見的入隊操作出發(fā),介紹 AQS 框架的實現(xiàn)與使用。從下面的代碼中可以看到入隊操作支持兩種模式,一種是排他模式,一種是共享模式,分別對應(yīng)了排它鎖場景和共享鎖場景。

  1. 當(dāng)任意一種請求,要入隊時,先會構(gòu)建一個 Node 實例,然后獲取當(dāng)前 AQS 隊列的尾結(jié)點(diǎn),如果尾結(jié)點(diǎn)為空,就是說隊列還沒初始化,初始化過程在后面 enq 函數(shù)中實現(xiàn)
  2. 這里我們先看初始化之后的情況,即 tail != null,先將當(dāng)前 Node 的前向指針 prev 更新,然后通過 CAS 將尾結(jié)點(diǎn)修改為當(dāng)前 Node,修改成功時,再更新前一個節(jié)點(diǎn)的后向指針 next,因為只有修改尾指針過程是原子的,所以這里會出現(xiàn)新插入一個節(jié)點(diǎn)時,之前的尾節(jié)點(diǎn) previousTail 的 next 指針為null的情況,也就是說會存在短暫的正向指針和反向指針不同步的情況,不過在后面的介紹中,你會發(fā)現(xiàn) AQS 很完備地避開了這種不同步帶來的風(fēng)險(通過從后往前遍歷)
  3. 如果上述操作成功,則當(dāng)前線程已經(jīng)進(jìn)入同步隊列,否則,可能存在多個線程的競爭,其他線程設(shè)置尾結(jié)點(diǎn)成功了,而當(dāng)前線程失敗了,這時候會和尾結(jié)點(diǎn)未初始化一樣進(jìn)入 enq 函數(shù)中。
/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        // 已經(jīng)進(jìn)行了初始化
        node.prev = pred;
        // CAS 修改尾節(jié)點(diǎn)
        if (compareAndSetTail(pred, node)) {
            // 成功之后再修改后向指針
            pred.next = node;
            return node;
        }
    }
    // 循環(huán) CAS 過程和初始化過程
    enq(node);
    return node;
}

正常通過 CAS 修改數(shù)據(jù)都會在一個循環(huán)中進(jìn)行,而這里的 addWaiter 只是在一個 if 中進(jìn)行,這是為什么呢?實際上,大家看到的 addWaiter 的這部分 CAS 過程是一個快速執(zhí)行線,在沒有競爭時,這種方式能省略不少判斷過程。當(dāng)發(fā)生競爭時,會進(jìn)入 enq 函數(shù)中,那里才是循環(huán) CAS 的地方。

  1. 整個 enq 的工作在一個循環(huán)中進(jìn)行
  2. 先會檢查是否未進(jìn)行初始化,是的話,就設(shè)置一個虛擬節(jié)點(diǎn) Node 作為 head 和 tail,也就是說同步隊列的第一個節(jié)點(diǎn)并不保存實際數(shù)據(jù),只是一個保存指針的地方
  3. 初始化完成后,通過 CAS 修改尾節(jié)點(diǎn),直到修改成功為止,最后修復(fù)后向指針
/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {// 在一個循環(huán)中進(jìn)行 CAS 操作
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // CAS 修改尾節(jié)點(diǎn)
            if (compareAndSetTail(t, node)) {
                // 成功之后再修改后向指針
                t.next = node;
                return t;
            }
        }

以上就是通過AQS實現(xiàn)數(shù)據(jù)組織的詳細(xì)內(nèi)容,更多關(guān)于AQS數(shù)據(jù)組織的資料請關(guān)注W3Cschool其它相關(guān)文章!


0 人點(diǎn)贊