引言

AQS是用來構建鎖和其他同步組件的基礎框架,它也是Java三大並發工具類(CountDownLatch、CyclicBarrier、Semaphore)的基礎。ReentrantLock,甚至BlockingQueue也是基於它的實現,可以說是非常重要了。

簡單介紹一下,AQS其實就是一個類,全稱是AbstractQueuedSynchronizer,隊列同步器。本文的重點是研究它的源碼,其他的基礎就不多說啦。想了解AQS基礎的同學可以看一下3y大佬的文章。

後續有機會我會分享自己對ReentrantLock,LinkedBlockingQueue,線程池源碼的理解~文章導讀:
  • AQS重要成員變數
  • 內部類-Node(等待隊列的實現)
  • 獲取資源(acquire)源碼
  • 釋放資源(release)源碼
  • 內部類-ConditionObject(條件隊列的實現)
  • 總結

一、AQS的重要成員變數

AQS中主要維護了state(鎖狀態的表示)和一個可阻塞的等待隊列。

state是臨界資源,也是鎖的描述。表示有多少線程獲取了鎖。

private volatile int state;

關於state的get,set方法就不貼了,重要的是有個通過CAS修改state的方法。

//設置期望值,想修改的值。通過CAS操作實現。
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

除此之外,還維護了等待隊列(也叫CHL隊列,同步隊列)的頭節點和尾節點。

private transient volatile Node head;

private transient volatile Node tail;

CHL隊列由鏈表實現,以自旋的方式獲取資源,是可阻塞的先進先出的雙向隊列。通過自旋和CAS操作保證節點插入和移除的原子性。當有線程獲取鎖失敗,就被添加到隊列末尾。下面我們來看看每個節點是怎麼實現的。

二、內部類-Node

AQS的工作模式分為獨佔模式和共享模式,記錄在節點的信息中。它還使用了模板方法設計模式,定義一個操作中演算法的骨架,而將一些步驟的實現延遲到子類中。比如獲取資源的方法就能很好的品味模板模式。一般地,它的實現類只實現一種模式,ReentrantLock就實現了獨佔模式;但也有例外,ReentrantReadAndWriteLock實現了獨佔模式和共享模式。下面來看Node相關源碼。

//當前節點處於共享模式的標記
static final Node SHARED = new Node();

//當前節點處於獨佔模式的標記
static final Node EXCLUSIVE = null;

//線程被取消了
static final int CANCELLED = 1;
//釋放資源後需喚醒後繼節點
static final int SIGNAL = -1;
//等待condition喚醒
static final int CONDITION = -2;
//工作於共享鎖狀態,需要向後傳播,
//比如根據資源是否剩餘,喚醒後繼節點
static final int PROPAGATE = -3;

//等待狀態,有1,0,-1,-2,-3五個值。分別對應上面的值
volatile int waitStatus;

//前驅節點
volatile Node prev;

//後繼節點
volatile Node next;

//等待鎖的線程
volatile Thread thread;

//等待條件的下一個節點,ConditonObject中用到
Node nextWaiter;

對於等待狀態(waitStatus)做一個解釋。

  • CANCELLED =1 線程被取消了
  • SIGNAL =-1 釋放資源後需喚醒後繼節點
  • CONDITION = -2 等待condition喚醒
  • PROPAGATE = -3 (共享鎖)狀態需要向後傳播
  • 0 初始狀態,正常狀態

CANCELLED

作廢狀態,該節點的線程由於超時,中斷等原因而處於作廢狀態。是不可逆的,一旦處於這個狀態,說明應該將該節點移除了。

SIGNAL

待喚醒後繼狀態,當前節點的線程處於此狀態,後繼節點會被掛起,當前節點釋放鎖或取消之後必須喚醒它的後繼節點。

CONDITION

等待狀態,表明節點對應的線程因為不滿足一個條件(Condition)而被阻塞。

三、獲取資源(鎖)

獲取釋放資源其實都是對state變數的修改,有的文章會管他叫鎖,筆者更喜歡叫資源。

獲取資源的方法有acquire(),acquiredShared()。先來看acquire(),該方法只工作於獨佔模式

3.1 acquire()--獨佔模式獲取資源

aquire():以獨佔模式獲取資源,忽略中斷(ReentrantLock.lock()中調用了這個方法)

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
//讓線程處於一種自旋狀態,
//嘗試讓該線程重新獲取鎖!當條件滿足獲取到了鎖則可以從自旋過程中
//退出,否則繼續。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

之前也提到了AQS使用了模板方法模式,其實tryAcuire()方法就是一個鉤子方法。在AQS中,此方法會拋出UnsupportedOperationException,所以需要子類去實現。tryAcquire(arg)返回false,其實就是獲取鎖失敗的情況。這時候就需要做自旋,重新獲取。

addWaiter():將當前線程插入至隊尾,返回在等待隊列中的節點(就是處理了它的前驅後繼)。

private Node addWaiter(Node mode) {
//把當前線程封裝為node,指定資源訪問模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果tail不為空,把node插入末尾
if (pred != null) {
node.prev = pred;
//此時可能有其他線程插入,所以使用CAS重新判斷tail
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果tail為空,說明隊列還沒有初始化,執行enq()
enq(node);
return node;
}

enq():將節點插入隊尾,失敗則自旋,直到成功。

private Node enq(final Node node) {
for (;;) {
Node t = tail;
//雖然tail==null才會執行本方法
//但是可能剛好有其他線程插入,會導致
//之前的判斷失效,所以重新判斷tail是否為空
//隊尾為空,說明隊列中沒有節點
//初始化頭尾節點
if (t == null) {
if (compareAndSetHead(new Node()))
//初始化完成後,接著走下一個循環,
//直到node正常插入尾部
tail = head;
} else {
//下面就是鏈表的正常插入操作了
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

acquireQueued():自旋方式獲取資源並判斷是否需要被掛起。

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//還是自旋嘗試獲取鎖資源
for (;;) {
final Node p = node.predecessor();
//如果節點的前驅是隊列的頭節點並且能拿到資源
//成功後則返回中斷位結束
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire(Node, Node)檢測當前節點是否應該park()
//parkAndCheckInterrupt()用於中斷當前節點中的線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire():判斷當前節點是否應該被掛起。下面涉及到的等待狀態,這裡再回憶一下,CANCELLED =1,SIGNAL =-1,CONDITION = -2,PROPAGATE = -3,0

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//前驅節點的狀態是SIGNAL,說明前驅節點釋放資源後會通知自己
//此時當前節點可以安全的park(),因此返回true
return true;
if (ws > 0) {
//前驅節點的狀態是CANCLLED,說明前置節點已經放棄獲取資源了
//此時一直往前找,直到找到最近的一個處於正常等待狀態的節點
//並排在它後面,返回false
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前驅節點的狀態是0或PROPGATE,則利用CAS將前置節點的狀態置
//為SIGNAL,讓它釋放資源後通知自己
//如果前置節點剛釋放資源,狀態就不是SIGNAL了,這時就會失敗
// 返回false
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

parkAndCheckInterrupt():若確定有必要park,才會執行此方法。

private final boolean parkAndCheckInterrupt() {
//使用LockSupport,掛起當前線程
LockSupport.park(this);
return Thread.interrupted();
}

selfInterrupt():對當前線程產生一個中斷請求。能走到這個方法,說明acquireQueued()返回true,就進行自我中斷。

static void selfInterrupt() {
Thread.currentThread().interrupt();
}

到這裡,獲取資源的流程就走完了,接下來總結一下。

aquire的步驟:1)tryAcquire()嘗試獲取資源。

2)如果獲取失敗,則通過addWaiter(Node.EXCLUSIVE), arg)方法把當前線程添加到等待隊列隊尾,並標記為獨佔模式。

3)插入等待隊列後,並沒有放棄獲取資源,acquireQueued()自旋嘗試獲取資源。根據前置節點狀態狀態判斷是否應該繼續獲取資源。如果前驅是頭結點,繼續嘗試獲取資源;

4)在每一次自旋獲取資源過程中,失敗後調用shouldParkAfterFailedAcquire(Node, Node)檢測當前節點是否應該park()。若返回true,則調用parkAndCheckInterrupt()中斷當前節點中的線程。若返回false,則接著自旋獲取資源。當acquireQueued(Node,int)返回true,則將當前線程中斷;false則說明拿到資源了。

5)在進行是否需要掛起的判斷中,如果前置節點是SIGNAL狀態,就掛起,返回true。如果前置節點狀態為CANCELLED,就一直往前找,直到找到最近的一個處於正常等待狀態的節點,並排在它後面,返回false,acquireQueed()接著自旋嘗試,回到3)。

6)前置節點處於其他狀態,利用CAS將前置節點狀態置為SIGNAL。當前置節點剛釋放資源,狀態就不是SIGNAL了,導致失敗,返回false。但凡返回false,就導致acquireQueed()接著自旋嘗試。

7)最終當tryAcquire(int)返回false,acquireQueued(Node,int)返回true,調用selfInterrupt(),中斷當前線程。

3.2 acquireShared()--共享模式獲取資源

接下來簡單說下共享模式下獲取資源的流程。acquireShared():以共享模式獲取對象,忽略中斷。先是tryAcquireShared(int)嘗試直接去獲取資源,如果成功,acquireShared(int)就結束了;否則,調用doAcquireShared(Node)將線程加入等待隊列,直到獲取到資源為止。

public final void acquireShared(int arg) {
//模板方法模式,tryAcquireShared由子類實現
//想看的話推薦讀寫鎖的源碼,這裡就不細述了
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

doAcquireShared():實現上和acquire()方法差不多,就是多判斷了是否還有剩餘資源,喚醒後繼節點。

private void doAcquireShared(int arg) {
//將線程加入等待隊列,設置為共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
//自旋嘗試獲取資源
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//設置頭節點,且如果還有剩餘資源,喚醒後繼節點獲取資源
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//是否需要被掛起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

3.3 hasQueuedPredecessors()--公平鎖在tryAqcuire()時調用

這裡補充一個方法,ReentrantLock如果是公平鎖的話,會調用AQS中的這個方法,算是後續文章的鋪墊吧。boolean hasQueuedPredecessors():判斷當前線程是否位於CLH同步隊列中的第一個。如果是則返回flase,否則返回true。

public final boolean hasQueuedPredecessors() {
//判斷當前節點在等待隊列中是否為頭節點的後繼節點(頭節點不存儲數據),
//如果不是,則說明有線程比當前線程更早的請求資源,
//根據公平性,當前線程請求資源失敗。
//如果當前節點沒有前驅節點的話,纔有做後面的邏輯判斷的必要性
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

3.4 doAcquireNanos()--獨佔模式下在規定時間內獲取鎖

這個方法在ReentrantLock.tryLock()過程中被調用。

doAcquireNanos():這個方法只工作於獨佔模式,自旋獲取資源超時後則返回false;如果有必要掛起且未超時則掛起。

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//計算截至時間
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//獲取鎖成功後,出隊
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return true;
}
//重新計算超時時間
nanosTimeout = deadline - System.nanoTime();
//超時則返回false
if (nanosTimeout <= 0L)
return false;
//否則判斷是否需要被阻塞,阻塞規定時間
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

3.5 doAcquireInterruptibly--獲取鎖時響應中斷

這個方法在ReentrantLock.lockInterruptibly()過程中被調用。doAcquireInterruptibly():獨佔模式下在獲取鎖時會阻塞,但是能響應中斷請求。

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//添加到等待隊列,包裝成Node
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
//自旋,直到前驅節點等待狀態為SIGNAL,檢查中斷標誌
//符合條件則阻塞當前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//當前線程被阻塞後,會中斷
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

四、釋放資源(鎖)

同樣的,釋放資源也分為釋放獨佔鎖資源(release())和共享鎖(releaseShared())資源。

先來看對於獨佔鎖的釋放。

4.1 release()--獨佔模式釋放資源

release():工作於獨佔模式,首先調用子類的tryRelease()方法釋放鎖,然後喚醒後繼節點,在喚醒的過程中,需要判斷後繼節點是否滿足情況,如果後繼節點不為空且不是作廢狀態,則喚醒這個後繼節點,否則從tail節點向前尋找合適的節點,如果找到,則喚醒。

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

unparkSuccessor():嘗試找到下一位繼承人,就是確定下一個獲取資源的線程,喚醒指定節點的後繼節點。

private void unparkSuccessor(Node node) {
//如果狀態為負說明是除CANCEL以外的狀態,
//嘗試在等待信號時清除。
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
//下一個節點為空或CANCELLED狀態
//則從隊尾往前找,找到正常狀態的節點作為之後的繼承人
//也就是下一個能拿到資源的節點
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//此時找到繼承人了,那麼就喚醒它
if (s != null)
LockSupport.unpark(s.thread);
}

4.2 releaseShared()--共享模式釋放資源

releaseShared():在釋放一部分資源後就可以通知其他線程獲取資源。

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

doReleaseShared():把當前結點的狀態由SIGNAL設置為PROPAGATE狀態。

private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
//將自旋嘗試修改等待狀態為0
continue;
//喚醒下一個被阻塞的節點
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
//設置為PROPAGATE狀態
continue;
}
//上面代碼塊執行過程中如果head變更了,就接著循環嘗試
if (h == head)
break;
}
}

五、內部類-ConditionObject

ConditionObject實現了Condition介面。用於線程間的通信,能夠把鎖粒度減小。重點是await()和signal()。這個內部類還維護了一個condition隊列,而且Node.nextWaiter就是用來將condition連接起來的。

//condition隊頭
private transient Node firstWaiter;
//condition隊尾
private transient Node lastWaiter;
//發生了中斷,但在後續不拋出中斷異常,而是「補上」這次中斷
private static final int REINTERRUPT = 1;
//發生了中斷,且在後續需要拋出中斷異常
private static final int THROW_IE = -1;

5.1 await()--阻塞等待方法

5.1.1 await的流程

await():當前線程處於阻塞狀態,直到調用signal()或中斷才能被喚醒。

1)將當前線程封裝成node且等待狀態為CONDITION。

2)釋放當前線程持有的所有資源,讓下一個線程能獲取資源。3)加入到條件隊列後,則阻塞當前線程,等待被喚醒。

4)如果是因signal被喚醒,則節點會從條件隊列轉移到等待隊列;如果是因中斷被喚醒,則記錄中斷狀態。兩種情況都會跳出循環。

5)若是因signal被喚醒,就自旋獲取資源;否則處理中斷異常。

public final void await() throws InterruptedException {
//如果被中斷,就處理中斷異常
if (Thread.interrupted())
throw new InterruptedException();
//初始化鏈表的功能,設置當前線程為鏈尾
Node node = addConditionWaiter();
//釋放當前節點持有的所有資源
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果當前線程不在等待隊列中,
//說明此時一定在條件隊列裏,將其阻塞。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//說明中斷狀態發生變化
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//當前線程執行了signal方法會經過這個,也就是重新將當前線程加入同步隊列中
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//刪除條件隊列中不滿足要求的元素
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//處理被中斷的情況
if (interruptMode != 0)
//這裡是個難點,具體的實現我自己也有點不理解
//就把知道的都寫出來
//如果是THROW_IE,說明signal之前發生中斷
//如果是REINTERRUPT,signal之後中斷,
//所以成功獲取資源後會調用selfInterrupt()
reportInterruptAfterWait(interruptMode);
}

addConditionWaiter():將當前線程封裝成節點,添加到條件隊列尾部,並返回當前節點。

private Node addConditionWaiter() {
Node t = lastWaiter;
// 判斷隊尾元素,如果非條件等待狀態則清理出去
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
//可能t之前引用的節點被刪除了,所以要重新引用
t = lastWaiter;
}
//這個節點就表示當前線程
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//說明條件按隊列中沒有元素
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

unlinkCancelledWaiters():遍歷一遍條件隊列,刪除非CONDITION狀態的節點。

private void unlinkCancelledWaiters() {
Node t = firstWaiter;
//記錄在循環中上一個waitStatus有效的節點
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
//再次判斷等待狀態,保證節點都是CONDITION狀態
//確保當前節點無效後刪除引用
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
//否則就直接加到隊尾的後面
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
//記錄有效的節點並向後遍歷
trail = t;
t = next;
}
}

5.1.2 await中關於中斷的處理

通過對上面代碼的觀察,我們知道await()調用了checkInterruptWhileWaiting()。

關於中斷這一塊,我自己看的也比較迷,就把一些自己能理解的地方標註一下。checkInterruptWhileWaiting():判斷在阻塞過程中是否被中斷。如果返回THROW_IE,則表示線程在調用signal()之前中斷的;如果返回REINTERRUPT,則表明線程在調用signal()之後中斷;如果返回0則表示沒有被中斷。

private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

transferAfterCancelledWait():線程是否因為中斷從park中喚醒。

final boolean transferAfterCancelledWait(Node node) {
//如果修改成功,暫且認為中斷髮生後,signal()被調用
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
//true表示中斷先於signal發生
return true;
}
//~~不理解~~
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

5.2.1 signal()--喚醒condition隊列中的線程

signal():喚醒一個被阻塞的線程。

public final void signal() {
//判斷當前線程是否為資源的持有者
//這也是必須在lock()與unlock()代碼中間執行的原因
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//開始喚醒條件隊列的第一個節點
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

doSignal():將條件隊列的頭節點從條件隊列轉移到等待隊列,並且,將該節點從條件隊列刪除。

private void doSignal(Node first) {
do {
//後續的等待條件為空,說明condition隊列中只有一個節點
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//transferForSignal()是真正喚醒頭節點的地方
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

transferForSignal():將節點放入等待隊列並喚醒。並不需要在條件隊列中移除,因為條件隊列每次插入時都會把狀態不為CONDITION的節點清理出去。

final boolean transferForSignal(Node node) {
//當前節點等待狀態改變失敗,則說明當前節點不是CONDITION
//狀態,那就不能進行接下來的操作,返回false
//0是正常狀態
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

//放入等待隊列隊尾中,並返回之前隊列的前一個節點
Node p = enq(node);
int ws = p.waitStatus;
//如果節點沒有被取消,或更改狀態失敗,則喚醒被阻塞的線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

5.2.2 signalAll()--喚醒condition隊列中所有線程

signalAll()本質上還是調用了doSignalAll()doSignalAll():遍歷條件隊列,插入到等待隊列中。

private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

5.3 awaitNanos()--超時機制

這裡補充一個方法awaitNanos(),是我看阻塞隊列源碼中遇到的。awaitNanos():輪詢檢查線程是否在同步線程上,如果在則退出自旋。否則檢查是否已超過解除掛起時間,如果超過,則退出掛起,否則繼續掛起線程到等待解除掛起。退出掛起之後,採用自旋的方式競爭鎖。

public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
//採用自旋的方式檢查是否已在等待隊列當中
while (!isOnSyncQueue(node)) {
//如果掛起超過一定的時間,則退出
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//繼續掛起線程
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
//採用自旋的方式競爭鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}

總結

關於AQS我覺得比較重要的就是獲取資源和釋放資源的方法,裡面用到了大量的CAS操作和自旋。AQS裡面維護了兩個隊列,一個是等待隊列(CHL),還有一個是條件隊列(condition)。

acquire()嘗試獲取資源,如果獲取失敗,將線程插入等待隊列。插入後,並沒有放棄獲取資源,而是根據前置節點狀態狀態判斷是否應該繼續獲取資源。如果前置節點是頭結點,繼續嘗試獲取資源;如果前置節點是SIGNAL狀態,就中斷當前線程,否則繼續嘗試獲取資源。直到當前線程被阻塞或者獲取到資源,結束。

release()釋放資源,需要喚醒後繼節點,判斷後繼節點是否滿足情況。如果後繼節點不為空且不是作廢狀態,則喚醒這個後繼節點;否則從尾部往前面找適合的節點,找到則喚醒。調用await(),線程會進入條件隊列,等待被喚醒,喚醒後以自旋方式獲取資源或處理中斷異常;調用signal(),線程會插入到等待隊列,喚醒被阻塞的線程。

如有不當之處,歡迎指出~

如果喜歡我的文章,歡迎關注我的專欄~

參考文章:Java並發編程札記,AQS簡簡單單過一遍,Condition源碼分析,JUC.Condition學習筆記[附詳細源碼解析]


推薦閱讀:
相關文章