溫故知新-多線程-深入剖析AQS

目錄

  • 摘要
  • AbstractQueuedSynchronizer實現一把鎖
  • ReentrantLock
    • ReentrantLock的特點
    • Synchronized的基礎用法
    • ReentrantLock與AQS的關聯
    • AQS架構圖
    • acquire獲取鎖
      • tryAcquire
      • hasQueuedPredecessors
      • acquireQueued
      • setHead
      • shouldParkAfterFailedAcquire
      • parkAndCheckInterrupt
      • cancelAcquire
    • unlock解鎖
      • release
      • tryRelease
      • unparkSuccessor
    • 中斷恢復
  • 其它
  • 參考
  • 你的鼓勵也是我創作的動力
  • Posted by 微博@Yangsc_o
  • 原創文章,版權聲明:自由轉載-非商用-非衍生-保持署名 | Creative Commons BY-NC-ND 3.0

摘要

本文通過ReentrantLock來窺探AbstractQueuedSynchronizer(AQS)的實現原理,在看此文之前。你需要了解一下park、unpark的功能,請移步至上一篇《深入剖析park、unpark》;

AbstractQueuedSynchronizer實現一把鎖

根據AbstractQueuedSynchronizer的官方文檔,如果想實現一把鎖的,需要繼承AbstractQueuedSynchronizer,並需要重寫tryAcquire、tryRelease、可選擇重寫isHeldExclusively提供locked state、因為支持序列化,所以需要重寫readObject以便反序列化時恢復原始值、newCondition提供條件;官方提供的java代碼如下(官方文檔見參考連接);

public class MyLock implements Lock, java.io.Serializable {
    private static class Sync extends AbstractQueuedSynchronizer {
      
        // Acquires the lock if state is zero
        @Override
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Releases the lock by setting state to zero
        @Override
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Provides a Condition
        Condition newCondition() {
            return new ConditionObject();
        }

        // Deserializes properly
        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
      
       // Reports whether in locked state
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    /**
     * The sync object does all the hard work. We just forward to it.
     */
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }


    private static volatile Integer value = 0;

    public static void main(String[] args) {

        MyLock myLock = new MyLock();
        for (int i = 0; i < 1000; i++) {
            new Thread(()->{
                myLock.lock();
                value ++;
                myLock.unlock();
            }).start();
        }
        System.out.println(value);
    }
}

上面是一個不可重入的鎖,它實現了一個鎖基礎功能,目的是為了跟ReentrantLock的實現做對比;

ReentrantLock

ReentrantLock的特點

ReentrantLock意思為可重入鎖,指的是一個線程能夠對一個臨界資源重複加鎖。ReentrantLock跟常用的Synchronized進行比較;

Synchronized的基礎用法

Synchronized的分析可以參考《深入剖析synchronized關鍵詞》,ReentrantLock可以創建公平鎖、也可以創建非公平鎖,接下來看一下ReentrantLock的簡單用法,非公平鎖實現比較簡單,今天重點是公平鎖;

public class ReentrantLockTest {

    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock(true);
        reentrantLock.lock();
        try {
            log.info("lock");
        } catch (Exception e) {
            log.error(e);
        } finally {
            reentrantLock.unlock();
            log.info("unlock");
        }
    }
}

ReentrantLock與AQS的關聯

先看一下加鎖方法lock

  • 非公平鎖lock方法

    compareAndSetState很好理解,通過CAS加鎖,如果加鎖失敗調用acquire;

/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
  • 公平鎖lock方法
final void lock() {
    acquire(1);
}
  • AQS框架的處理流程

​ 線程繼續等待,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續,分析實現原理

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

總結:公平鎖的上鎖是必須判斷自己是不是需要排隊;而非公平鎖是直接進行CAS修改計數器看能不能加鎖成功;如果加鎖不成功則乖乖排隊(調用acquire);所以不管公平還是不公平;只要進到了AQS隊列當中那麼他就會排隊;

AQS架構圖

美團畫的AQS的架構圖,很詳細,當有自定義同步器接入時,只需重寫第一層所需要的部分方法即可,不需要關注底層具體的實現流程。當自定義同步器進行加鎖或者解鎖操作時,先經過第一層的API進入AQS內部方法,然後經過第二層進行鎖的獲取,接着對於獲取鎖失敗的流程,進入第三層和第四層的等待隊列處理,而這些處理方式均依賴於第五層的基礎數據提供層。

AQS核心思想是,如果被請求的共享資源空閑,那麼就將當前請求資源的線程設置為有效的工作線程,將共享資源設置為鎖定狀態;如果共享資源被佔用,就需要一定的阻塞等待喚醒機制來保證鎖分配。這個機制主要用的是CLH隊列的變體實現的,將暫時獲取不到鎖的線程加入到隊列中。

CLH:Craig、Landin and Hagersten隊列,是單向鏈表,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO),AQS是通過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。

  • 非公平鎖的加鎖流程
  • 公平鎖的加鎖流程
  • 解鎖公平鎖和非公平鎖邏輯一致

加鎖:

  • 通過ReentrantLock的加鎖方法Lock進行加鎖操作。
  • 會調用到內部類Sync的Lock方法,由於Sync#lock是抽象方法,根據ReentrantLock初始化選擇的公平鎖和非公平鎖,執行相關內部類的Lock方法,本質上都會執行AQS的Acquire方法。
  • AQS的Acquire方法會執行tryAcquire方法,但是由於tryAcquire需要自定義同步器實現,因此執行了ReentrantLock中的tryAcquire方法,由於ReentrantLock是通過公平鎖和非公平鎖內部類實現的tryAcquire方法,因此會根據鎖類型不同,執行不同的tryAcquire。
  • tryAcquire是獲取鎖邏輯,獲取失敗后,會執行框架AQS的後續邏輯,跟ReentrantLock自定義同步器無關。
  • 流程:Lock -> acquire -> tryAcquire( or nonfairTryAcquire)

解鎖:

  • 通過ReentrantLock的解鎖方法Unlock進行解鎖。
  • Unlock會調用內部類Sync的Release方法,該方法繼承於AQS。
  • Release中會調用tryRelease方法,tryRelease需要自定義同步器實現,tryRelease只在ReentrantLock中的Sync實現,因此可以看出,釋放鎖的過程,並不區分是否為公平鎖。
  • 釋放成功后,所有處理由AQS框架完成,與自定義同步器無關。
  • 流程:unlock -> release -> tryRelease

acquire獲取鎖

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        selfInterrupt();
    }
}

tryAcquire

acquire方法首先會調tryAcquire方法,需要注意的是tryAcquire的結果做取反;根據前面分析,tryAcquire會調用子類的實現,ReentrantLock有兩個內部類,FairSync,NonfairSync,都繼承自Sync,Sync繼承AbstractQueuedSynchronizer;

實現方式差別在是否有hasQueuedPredecessors() 的判斷條件

  • 公平鎖實現
/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 獲取lock對象的上鎖狀態,如果鎖是自由狀態則=0,如果被上鎖則為1,大於1表示重入  
    int c = getState();
    if (c == 0) {
      	// hasQueuedPredecessors,判斷自己是否需要排隊
        // 下面我會單獨介紹,如果不需要排隊則進行cas嘗試加鎖
        // 如果加鎖成功則把當前線程設置為擁有鎖的線程
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
  	// 如果C不等於0,但是當前線程等於擁有鎖的線程則表示這是一次重入,那麼直接把狀態+1表示重入次數+1
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

非公平鎖

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

hasQueuedPredecessors

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
  • Node

先來看下AQS中最基本的數據結構——Node,Node即為上面CLH變體隊列中的節點。

static final class Node {
    static final Node SHARED = new Node(); // 表示線程以共享的模式等待鎖
    static final Node EXCLUSIVE = null; // 表示線程正在以獨佔的方式等待鎖
    static final int CANCELLED =  1; // 表示線程獲取鎖的請求已經取消了
    static final int SIGNAL    = -1; // 表示線程已經準備好了,就等資源釋放了
    static final int CONDITION = -2; // 表示節點在等待隊列中,節點線程等待喚醒
    static final int PROPAGATE = -3; // 當前線程處在SHARED情況下,該字段才會使用
    volatile int waitStatus; // 當前節點在隊列中的狀態
    volatile Node prev; // 前驅指針
    volatile Node next; // 後繼指針
    volatile Thread thread; // 表示處於該節點的線程
    Node nextWaiter; // 指向下一個處於CONDITION狀態的節點
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    // 返回前驅節點,沒有的話拋出npe
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {    // Used to establish initial head or SHARED marker
    }
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

再看hasQueuedPredecessors,整個方法如果最後返回false,則去加鎖,如果返回true則不加鎖,因為這個方法被取反操作;hasQueuedPredecessors是公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法。如果返回False,說明當前線程可以爭取共享資源;如果返回True,說明隊列中存在有效節點,當前線程必須加入到等待隊列中。

  • h != t && ((s = h.next) == null || s.thread != Thread.currentThread());

雙向鏈表中,第一個節點為虛節點,其實並不存儲任何信息,只是佔位。真正的第一個有數據的節點,是在第二個節點開始的。

  • 當h != t時: 如果(s = h.next) == null,等待隊列正在有線程進行初始化,但只是進行到了Tail指向Head,沒有將Head指向Tail,此時隊列中有元素,需要返回True(這塊具體見下邊代碼分析)。
  • 如果(s = h.next) != null,說明此時隊列中至少有一個有效節點。
  • 如果此時s.thread == Thread.currentThread(),說明等待隊列的第一個有效節點中的線程與當前線程相同,那麼當前線程是可以獲取資源的;
  • 如果s.thread != Thread.currentThread(),說明等待隊列的第一個有效節點線程與當前線程不同,當前線程必須加入進等待隊列。

如果這上面沒有看懂,沒有關係,先來分析一下構建整個隊列的過程;

  • addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // tail為對尾,賦值給pred
    Node pred = tail;
    // 判斷pred是否為空,其實就是判斷對尾是否有節點,其實只要隊列被初始化了對尾肯定不為空
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
  • enq
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

用一張圖來分析一下,整個隊列構建過程;

  • (1)通過Node(Thread thread, Node mode) 方法構建一個node節點(node2),此時的nextWaiter為空,線程不為空,是當前線程;

  • (2)如果隊尾為空,則說明隊列未建立,調用enq構建第一個虛擬節點(node1),通過compareAndSetHead方法構建一個頭節點,需要注意的是該頭節點thread是null,後續很多都是用線程是否為null來判讀是否為第一個虛擬節點;

  • (3)將node1 cas設置為head

  • (4)將頭節點賦值為tail = head

  • (5)進入下一次for循環時,會走到else分支,會將傳入的node的指向頭部節點的next,此時node2的prev指向node1(tail)

  • (6)將node2 cas設置為tail;

  • (7)將node2指向node1的next;

    經過上面的步驟,就構建了一個長度為2的隊列;

添加第二個隊列時,走的是這段代碼,流程就簡單多了,代碼如下

if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
    }
}

再看一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());因為整個構建過程並不是原子操作,所以這個條件判斷,現在再是不是就看明白了?

  • 當h != t時(3)步驟已經完成: 如果(s = h.next) == null 此時步驟(4)未完成,等待隊列正在有線程進行初始化,但只是進行到了Tail指向Head,沒有將Head指向Tail,此時隊列中有元素,需要返回True
  • 如果(s = h.next) != null,說明此時隊列中至少有一個有效節點。
  • 如果此時s.thread == Thread.currentThread(),說明等待隊列的第一個有效節點中的線程與當前線程相同,那麼當前線程是可以獲取資源的;
  • 如果s.thread != Thread.currentThread(),說明等待隊列的第一個有效節點線程與當前線程不同,當前線程必須加入進等待隊列。

acquireQueued

addWaiter方法其實就是把對應的線程以Node的數據結構形式加入到雙端隊列里,返回的是一個包含該線程的Node。而這個Node會作為參數,進入到acquireQueued方法中。acquireQueued方法可以對排隊中的線程進行“獲鎖”操作。總的來說,一個線程獲取鎖失敗了,被放入等待隊列,acquireQueued會把放入隊列中的線程不斷去獲取鎖,直到獲取成功或者不再需要獲取(中斷)。

下面通過代碼從“何時出隊列?”和“如何出隊列?”兩個方向來分析一下acquireQueued源碼:

final boolean acquireQueued(final Node node, int arg) {
    // 標記是否成功拿到資源
    boolean failed = true;
    try {
        // 標記等待過程中是否中斷過
        boolean interrupted = false;
        for (;;) {
            // 獲取當前節點的前驅節點,有兩種情況;1、上一個節點為頭部;2上一個節點不為頭部
            final Node p = node.predecessor();
            // 如果p是頭結點,說明當前節點在真實數據隊列的首部,就嘗試獲取鎖(頭結點是虛節點)
            // 因為第一次tryAcquire判斷是否需要排隊,如果需要排隊,那麼我就入隊,此處再重試一次
            if (p == head && tryAcquire(arg)) {
                // 獲取鎖成功,頭指針移動到當前node
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 說明p為頭節點且當前沒有獲取到鎖(可能是非公平鎖被搶佔了)或者是p不為頭結點,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅節點的waitStatus為-1),防止無限循環浪費資源。具體兩個方法下面細細分析
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
       // 成功拿到資源,準備釋放
        if (failed)
            cancelAcquire(node);
    }
}

setHead

設置當前節點為頭節點,並且將node.thread為空(剛才提到判斷是否為頭部虛擬節點的條件就是node.thread == null。waitStatus狀態併為修改,等下我們再分析;

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

shouldParkAfterFailedAcquire

接下來看shouldParkAfterFailedAcquire代碼,需要注意的是,每一個新創建Node的節點是被下一個排隊的node設置為等待狀態為SIGNAL, 這裏比較難以理解為什麼需要去改變上一個節點的park狀態?

每個node都有一個狀態,默認為0,表示無狀態,-1表示在park;當時不能自己把自己改成-1狀態?因為你得確定你自己park了才是能改為-1;所以只能先park;在改狀態;但是問題你自己都park了;完全釋放CPU資源了,故而沒有辦法執行任何代碼了,所以只能別人來改;故而可以看到每次都是自己的后一個節點把自己改成-1狀態;

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取前驅節點的狀態 
    int ws = pred.waitStatus;
    // 說明頭結點處於喚醒狀態
    if (ws == Node.SIGNAL)
        return true;
    // static final int CANCELLED =  1; // 表示線程獲取鎖的請求已經取消了
    // static final int SIGNAL    = -1; // 表示線程已經準備好了,就等資源釋放了
    // static final int CONDITION = -2; // 表示節點在等待隊列中,節點線程等待喚醒
    // static final int PROPAGATE = -3; // 當前線程處在SHARED情況下,該字段才會使用
    if (ws > 0) {
        do {
            // 把取消節點從隊列中剔除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 設置前任節點等待狀態為SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt

調用LockSupport.park掛起當前線程,自己已經park,無法再修改狀態了!

private final boolean parkAndCheckInterrupt() {
    // 調⽤用park()使線程進⼊入waiting狀態
    LockSupport.park(this);
    // 如果被喚醒,查看⾃自⼰己是不不是被中斷的,這⾥里里先清除⼀下標記位
    return Thread.interrupted(); 
}

shouldParkAfterFailedAcquire的整個流程還是比較清晰的,如果不清楚,可以參考美團畫的流程圖;

cancelAcquire

通過上面的分析,當failed為true時,也就意味着park結束,線程被喚醒了,for循環已經跳出,開始執行cancelAcquire,通過cancelAcquire方法,將Node的狀態標記為CANCELLED;代碼如下:

private void cancelAcquire(Node node) {
    // 將無效節點過濾
    if (node == null)
        return;
    // 設置該節點不關聯任何線程,也就是虛節點(上面已經提到,node.thread = null是判讀是否是頭節點的條件)
    node.thread = null;
    Node pred = node.prev;
    // 通過前驅節點,處理waitStatus > 0的node
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 把當前node的狀態設置為CANCELLED,當下一個node排隊結束時,自己就會被上一行代碼處理掉;
    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;
    // 如果當前節點是尾節點,將從后往前的第一個非取消狀態的節點設置為尾節點,更新失敗的話,則進入else,如果更新成功,將tail的後繼節點設置為null
    if (node == tail && compareAndSetTail(node, pred)) {
        // 把自己設置為null
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // 如果當前節點不是head的後繼節點
        // 1:判斷當前節點前驅節點的是否為SIGNAL
        // 2:如果不是,則把前驅節點設置為SINGAL看是否成功
        // 如果1和2中有一個為true,再判斷當前節點的線程是否為null
        // 如果上述條件都滿足,把當前節點的前驅節點的後繼指針指向當前節點的後繼節點 
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 如果當前節點是head的後繼節點,或者上述條件不滿足,那就喚醒當前節點的後繼節點
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

當前的流程:

  • 獲取當前節點的前驅節點,如果前驅節點的狀態是CANCELLED,那就一直往前遍歷,找到第一個waitStatus <= 0的節點,將找到的Pred節點和當前Node關聯,將當前Node設置為CANCELLED。

  • 根據當前節點的位置,考慮以下三種情況:

    (1) 當前節點是尾節點。

    (2) 當前節點是Head的後繼節點。

    (3) 當前節點不是Head的後繼節點,也不是尾節點。

(1)當前節點時尾節點

(2)當前節點是Head的後繼節點。

這張圖描述的是這段代碼:unparkSuccessor

Node s = node.next;
if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
            s = t;
}

(3)當前節點不是Head的後繼節點,也不是尾節點。

這張圖描述的是這段代碼跟(2)一樣;

通過上面的圖,你會發現所有的變化都是對Next指針進行了操作,而沒有對Prev指針進行操作,原因是執行cancelAcquire的時候,當前節點的前置節點可能已經從隊列中出去了(已經執行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),也就是下圖中代碼1和代碼2直接的間隙就會出現這種情況,此時修改Prev指針,有可能會導致Prev指向另一個已經移除隊列的Node,因此這塊變化Prev指針不安全。

unlock解鎖

解鎖時並不區分公平和不公平,因為ReentrantLock實現了鎖的可重入,可以進一步的看一下時如何處理的,上代碼:

public void unlock() {
    sync.release(1);
}

release

public final boolean release(int arg) {
    // 自定義的tryRelease如果返回true,說明該鎖沒有被任何線程持有
    if (tryRelease(arg)) {
        // 獲取頭結點
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 頭結點不為空並且頭結點的waitStatus不是初始化節點情況,解除線程掛起狀態
            unparkSuccessor(h);
        return true;
    }
    return false;
}

這裏的判斷條件為什麼是h != null && h.waitStatus != 0

  1. h == null Head還沒初始化。初始情況下,head == null,第一個節點入隊,Head會被初始化一個虛擬節點。如果還沒來得及入隊,就會出現head == null 的情況。
  2. h != null && waitStatus == 0 表明後繼節點對應的線程仍在運行中,不需要喚醒。
  3. h != null && waitStatus < 0 表明後繼節點可能被阻塞了,需要喚醒,(還記得一個node是在shouldParkAfterFailedAcquire方法中被設置為SIGNAL = -1的吧?不記得翻看一下上面吧)

tryRelease

protected final boolean tryRelease(int releases) {
    // 減少可重入次數,setState(c);
    int c = getState() - releases;
    // 當前線程不是持有鎖的線程,拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果持有線程全部釋放,將當前獨佔鎖所有線程設置為null,並更新state
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

unparkSuccessor

這個方法在cancelAcquire其實也用到了,簡單分析一下

// 如果當前節點是head的後繼節點,或者上述條件不滿足,就喚醒當前節點的後繼節點unparkSuccessor(node);

private void unparkSuccessor(Node node) {
    // 獲取結點waitStatus,CAS設置狀態state=0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 獲取當前節點的下一個節點
    Node s = node.next;
    // 如果下個節點是null或者下個節點被cancelled,就找到隊列最開始的非cancelled的節點
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 就從尾部節點開始找,到隊首,找到隊列第一個waitStatus<0的節點。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果當前節點的下個節點不為空,而且狀態<=0,就把當前節點unpark
    if (s != null)
        LockSupport.unpark(s.thread);
}

為什麼要從后往前找第一個非Cancelled的節點呢?

原因1:addWaiter方法並非原子,構建鏈表結構時如下圖中 1、2間隙執行unparkSuccessor,此時鏈表是不完整的,沒辦法從前往後找了;

原因2:還有一點原因,在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針並未斷開,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node;

中斷恢復

喚醒后,會執行return Thread.interrupted();,這個函數返回的是當前執行線程的中斷狀態,並清除。

private final boolean parkAndCheckInterrupt() {
	LockSupport.park(this);
	return Thread.interrupted();
}

acquireQueued代碼,當parkAndCheckInterrupt返回True或者False的時候,interrupted的值不同,但都會執行下次循環。如果這個時候獲取鎖成功,就會把當前interrupted返回。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		boolean interrupted = false;
		for (;;) {
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
				interrupted = true;
			}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

如果acquireQueued為True,就會執行selfInterrupt方法。

該方法其實是為了中斷線程。但為什麼獲取了鎖以後還要中斷線程呢?這部分屬於Java提供的協作式中斷知識內容,感興趣同學可以查閱一下。這裏簡單介紹一下:

  1. 當中斷線程被喚醒時,並不知道被喚醒的原因,可能是當前線程在等待中被中斷,也可能是釋放了鎖以後被喚醒。因此我們通過Thread.interrupted()方法檢查中斷標記(該方法返回了當前線程的中斷狀態,並將當前線程的中斷標識設置為False),並記錄下來,如果發現該線程被中斷過,就再中斷一次。
  2. 線程在等待資源的過程中被喚醒,喚醒后還是會不斷地去嘗試獲取鎖,直到搶到鎖為止。也就是說,在整個流程中,並不響應中斷,只是記錄中斷記錄。最後搶到鎖返回了,那麼如果被中斷過的話,就需要補充一次中斷。

這裏的處理方式主要是運用線程池中基本運作單元Worder中的runWorker,通過Thread.interrupted()進行額外的判斷處理,可以看下ThreadPoolExecutor源碼的判斷條件;

其它

AQS在JUC中有⽐比較⼴廣泛的使⽤用,以下是主要使⽤用的地⽅方:

  • ReentrantLock:使⽤用AQS保存鎖重複持有的次數。當⼀一個線程獲取鎖時, ReentrantLock記錄當
    前獲得鎖的線程標識,⽤用於檢測是否重複獲取,以及錯誤線程試圖解鎖操作時異常情況的處理理。
  • Semaphore:使⽤用AQS同步狀態來保存信號量量的當前計數。 tryRelease會增加計數,
    acquireShared會減少計數。
  • CountDownLatch:使⽤用AQS同步狀態來表示計數。計數為0時,所有的Acquire操作
    (CountDownLatch的await⽅方法)才可以通過。
  • ReentrantReadWriteLock:使⽤用AQS同步狀態中的16位保存寫鎖持有的次數,剩下的16位⽤用於保
    存讀鎖的持有次數。
  • ThreadPoolExecutor: Worker利利⽤用AQS同步狀態實現對獨佔線程變量量的設置(tryAcquire和
    tryRelease)。

至此,通過ReentrantLock分析AQS的實現原理一家完畢,需要說明的是,此文深度參考了美團分析的ReentrantLock,是參考鏈接的第三個,有興趣可以對比差異,感謝!

參考

JDK API 文檔

Java的LockSupport.park()實現分析

[從ReentrantLock的實現看AQS的原理及應用

[Thread的中斷機制(interrupt)

你的鼓勵也是我創作的動力

打賞地址

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※回頭車貨運收費標準