參考:bilibili.com/video/av29

《java並發編程之美》

AQS, AbstractQueuedSynchronizer,抽象同步隊列。並發包里的一個核心類。

在這裡有必要再貼一波,不然怕你們忘了:

AQS是一個FIFO的雙向隊列,其內部通過節點head和tail記錄隊首和隊尾元素,隊列元素的類型為node。其中node中的thread變數用來存儲放進AQS隊列裡面的線程。node節點內部的shared是用來標記該線程是獲取共享資源時被阻塞掛起後放入AQS隊列的,exclusive是用來標記線程是獲取獨佔資源時被掛起後放入AQS隊列的。waitstatus記錄當前線程的等待狀態,可以為cancelled(線程被取消了),signal(線程需要被喚醒),condition(線程在條件隊列裡面等待),propagate(釋放共享資源時需要通知其他節點)。prev記錄當前結點的前驅節點,next記錄當前結點的後繼節點。

在AQS中維護了一個單一的狀態信息state,可以通過getstate,setstate,compareandsetstate函數修改其值。對於reentrantlock來說,state是用來表示當前線程獲取鎖的可重入次數;(前面已經進行了詳細說明),對於countdownlatch來說,state用來表示當前計數器的值,前面的文章也對countdownlatch進行過詳細說明。

抽象,並不代表它是抽象類,而是指它在使用上並不是單獨使用的,而是用來構成鎖或者其它同步組件才有意義。

如果前面的看不懂可以看完全部的再到回來看。

為實現依賴於先進先出 (FIFO) 等待隊列的阻塞鎖和相關同步器(信號量、事件,等等)提供一個框架。此類的設計目標是成為依靠單個原子 int 值來表示狀態的大多數同步器的一個有用基礎 -- 摘自API

上面這段話的意思就是說這個AQS類是一個結構為FIFO的雙向隊列的小框架,基於這個小框架你可以去設計阻塞鎖和其他的同步器。然後是這個類裡面有一個int值表示狀態,在同步器裡面有用。

此類支持默認的獨佔 模式和共享 模式之一,或者二者都支持。處於獨佔模式下時,其他線程試圖獲取該鎖將無法取得成功。在共享模式下,多個線程獲取某個鎖可能(但不是一定)會獲得成功。此類並不「了解」這些不同,除了機械地意識到當在共享模式下成功獲取某一鎖時,下一個等待線程(如果存在)也必須確定自己是否可以成功獲取該鎖。處於不同模式下的等待線程可以共享相同的 FIFO 隊列。通常,實現子類只支持其中一種模式,但兩種模式都可以在(例如)ReadWriteLock 中發揮作用。只支持獨佔模式或者只支持共享模式的子類不必定義支持未使用模式的方法。

這裡的AQS使用了模板方法的設計模式,如果大家不了解可以去網上了解下,或者看後面的結構思路也可以了解到。

  • tryAcquire(int) 獲取
  • tryRelease(int) 釋放
  • tryAcquireShared(int)
  • tryReleaseShared(int)
  • isHeldExclusively()

默認情況下,每個方法都拋出 UnsupportedOperationException。這些方法的實現在內部必須是線程安全的,通常應該很短並且不被阻塞。定義這些方法是使用此類的唯一 受支持的方式。其他所有方法都被聲明為 final,因為它們無法是各不相同的 -- 摘自API

上面兩段話是啥意思呢?

就是這裡有兩種模式,獨佔鎖和共享鎖。由名字即可得意義。

然後後面有5個方法,當你繼承AQS這個類,重寫前兩個方法時就是獨佔模式,重寫後兩個方法時是共享模式,isHeldExclusively()方法是問是否為獨佔式的?是則返回true,不是返回false。

然後其它的方法都是有final修飾,不能被重寫,所以大家都一樣。

然後API中有個基於AQS寫的一個小demo:

class Mutex implements Lock, java.io.Serializable {

// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Report whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}

// Acquire the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// Release the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// Provide a Condition
Condition newCondition() { return new ConditionObject(); }

// Deserialize properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();

public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}

這裡簡單解釋一下,理解意思就好:就是定義了一個Sync的類,它繼承了AQS,也就是使用AQS來實現Sync這個同步器。這個Sync大家可以記住,因為很多源碼中都會看到這個,到時候看到就不陌生了。 然後看這個類是只實現了

  • tryAcquire(int) 獲取
  • tryRelease(int) 釋放

這兩個方法,也就是說它是使用獨佔鎖的模式。然後後面生成Sync實例後,調用了很多沒看過的方法,這些方法都是AQS自帶的,沒有修改過的。

然後我們來看看這個類具體的API:

compareAndSetState 這個方法應該很熟悉了吧,前面講的CAS,保證操作的原子性。

getState setState 就是對一個狀態值的獲取和修改

後面的api大家可以自行了解,前面也做了些許說明。

然後,我們從源碼對這個AQS進行進一步的了解,前面是建立了一些基本概念,現在來具體化。

打開ReentrantLock的源碼,然後我們關注他的lock和unlock方法,

public void lock() {
sync.lock();
}

這是代碼,然後我們再來看sync,

abstract static class Sync extends AbstractQueuedSynchronizer

沒錯,就是一個繼承AQS的同步器,和我們前面看的demo一模一樣。然後我們再來看sync的lock方法

abstract void lock();

是個抽象方法,原因在於reentrantlock有公平鎖和非公平鎖之分,這裡簡單介紹一下,後面的文章再進行詳細介紹。我們平常使用的時候都是new reentrantlock(),裡面沒有傳遞參數,此時默認為非公平鎖,相當於new reentrantlock(false),當傳入true時即為公平鎖。啥是公平鎖和非公平鎖呢?比如當A線程獲得鎖,然後B線程來請求該鎖,接著C線程後面來請求該鎖,那麼當A釋放鎖後,如果是公平鎖,那麼久會把C線程掛起,讓B線程獲取鎖。如果是非公平鎖,那麼B和C都可能獲得鎖,也就是沒有了先到先得的規則。

所以,sync有兩個子類:NonfairSync和FairSync。非公平的同步器和公平的同步器。

我們來看看fairsync的源碼:

static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Dont grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

這裡關注他的lock方法,裡面是acquire(1),這裡的acquire是父類AQS裡面的方法:

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

然後再進入AQS的tryacquire:

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

則是拋出異常,這裡就要聯繫我們前面說的重寫那幾個方法了,如果不重寫就會拋出UnsupportedOperationException的異常,所以在FairSync里就重寫了tryacquire方法。

也就是說,子類先調用父類方法,然後再父類再調用回子類方法。這裡和模板方法模式有關,在這裡就不再展開話題了,後面寫文章對此設計模式進行詳細講解。

然後我們來仔細分析tryacquire方法:

這裡就相當於我們前面的手寫一個可重入的鎖lock裡面的lock方法:

不記得的可以去翻看一下,對於這裡的理解挺重要的,這裡的state和lock里的count就是一樣的工作機制,這裡也有記錄當前線程的量。

下面是公平性的代碼:

protected final boolean tryAcquire(int acquires) { //傳入的參數就是1

final Thread current = Thread.currentThread(); //記錄下此時的線程是哪個 int c = getState(); //獲取此時的state值 if (c == 0) { //如果是0,那說明是鎖空閑,第一個線程進來 if (!hasQueuedPredecessors() && //這個方法是用來保證公平性的 compareAndSetState(0, acquires)) { //嘗試CAS操作獲取該鎖,把AQS的狀態值從0設置為1. 在CAS中 V=0;A=0;B=acquires=1。

setExclusiveOwnerThread(current); //設置當前鎖的擁有者為當前線程,然後返回true。

return true; //Exclusive 獨佔的,這個單詞好好記住

} } else if (current == getExclusiveOwnerThread()) { //如果此線程是前面已經獲得鎖的線程,即代表重入了,則把state加1.然後獲得鎖,返回true,不進行阻塞。 int nextc = c + acquires; if (nextc < 0) //說明可重入次數溢出了 throw new Error("Maximum lock count exceeded"); setState(nextc); return true;

}

return false;//如果不是上面兩種情況,即既不是第一次,也不是鎖的持有者就返回false,然後就被放入AQS阻塞隊列。 }

我們再來對比一下非公平性的代碼:

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { (A)
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { (B)
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; (C)
}

這上面的代碼現在看起來簡單多了吧,差別就是一個!hasQueuedPredecessors()的事情。

我們來分析一下非公平性的原理:

當第一個線程進來,進入到A部分代碼,發現c=0,所以獲得鎖,返回true走人。

當第二個線程進來,進入到A部分代碼,發現c!=0,然後接著運行B部分代碼,發現當前線程並不是擁有鎖的線程,所以運行到C部分的代碼,返回false,存入AQS的阻塞隊列當中。

此時第一個線程忽然釋放了這個鎖,然後馬上進來了第三個線程,調用lock方法後進入到A部分代碼,發現state為0,所以馬上獲取鎖去幹活了。而此時第二個線程依然保存在阻塞隊列當中。

所以,這段代碼並沒有先來後到的規則,也就是我們說的不公平的。

然後我們來仔細分析一下這個實現公平性的方法:hasQueuedPredecessors()

public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

記得AQS里維護了一個雙向隊列,這裡先獲取了頭尾節點。然後作者寫得極其簡略,以至於直接看不太好理解。

先看h=t的情況,頭尾節點相同即說明隊列為空。即返回false,聯繫前面的代碼,取反,返回false,然後繼續進行CAS操作,則線程獲取鎖後直接運行,此時無公平性之說。

然後看h!=t,並且s=null則說明有一個元素將要作為AQS的第一個節點放入隊列。(此為數據結構的基礎知識),然後就返回true,即不進行接下來的CAS操作,退出。

然後看h!=t,並且s!=null和s.thread!=thread.currentthread(),則說明隊列里的第一個元素不是當前線程,那麼返回true,不繼續進行CAS操作,退出。

然後結合下面的理解,我們知道:退出的線程都在後面被加入這個雙向阻塞隊列當中了,也就是去排隊了,哈哈。

然後再來看tryacquire方法返回後在acquire里的邏輯,當成功獲得鎖,就返回true,取反為false,即為假,然後跳出if。然後我們知道當if中的邏輯關係為&&時,如果第一個為真,即獲得鎖失敗返回false,取反後為true時,它要繼續執行後面的條件(這裡要你基礎紮實才知道它的執行邏輯),即 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) ,則將當前線程封裝類型為Node.EXCLUSIVE的node節點後,插入到AQS隊列的的尾部,並調用locksupport.park(this)方法掛起自己。

成功獲得鎖的情況只有兩種,要麼第一個來獲得鎖,要麼是重入。

下面是AQS中的代碼:

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

然後,我們來看addWaiter這個方法:

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

這個方法就和我們說的AQS內部維護的雙向隊列息息相關了,就是把當前線程添加到這個雙向隊列當中,然後添加時又討論隊列處於什麼狀況,然後怎麼添加。對數據結構稍微熟悉一些就比較容易看懂。這個enq方法有點深度,先大致理解作用吧。

感覺在講這些方法之前應該先對這個AQS的雙向隊列進行講解先:(建議跟著源碼一起看)

AQS是一個FIFO的雙向隊列,其內部通過節點head和tail記錄隊首和隊尾元素,隊列元素的類型為node。其中node中的thread變數用來存儲放進AQS隊列裡面的線程。node節點內部的shared是用來標記該線程是獲取共享資源時被阻塞掛起後放入AQS隊列的,exclusive是用來標記線程是獲取獨佔資源時被掛起後放入AQS隊列的。waitstatus記錄當前線程的等待狀態,可以為cancelled(線程被取消了),signal(線程需要被喚醒),condition(線程在條件隊列裡面等待),propagate(釋放共享資源時需要通知其他節點)。prev記錄當前結點的前驅節點,next記錄當前結點的後繼節點。

在AQS中維護了一個單一的狀態信息state,可以通過getstate,setstate,compareandsetstate函數修改其值。對於reentrantlock來說,state是用來表示當前線程獲取鎖的可重入次數;(前面已經進行了詳細說明),對於countdownlatch來說,state用來表示當前計數器的值,前面的文章也對countdownlatch進行過詳細說明。

接著來說一下reentrantlock中釋放鎖的方法:

public void unlock() {

sync.release(1); } public final boolean release(int arg) { //這個是AQS當中的,聯繫前面的acquire方法。 if (tryRelease(arg)) { //當線程成功釋放鎖後,

Node h = head;

if (h != null && h.waitStatus != 0) unparkSuccessor(h);//進入這個方法就知道,有 LockSupport.unpark(s.thread);方法激活AQS隊列裡面被阻塞一個線程。被激活的線程則使用tryacquire嘗試,看當前狀態變數state的值是否滿足自己的需要,滿足則線程被激活,然後繼續向下運行,否則還是會被放入AQS隊列並被掛起。 return true; } return false; }

這個和我們前面文章的手寫的釋放鎖的也差不多:

protected final boolean tryRelease(int releases) {

int c = getState() - releases; //count值減一 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); //如果當前線程沒有持有該鎖但調用了該方法,則會拋出這個異常。作者考慮的比較周全 boolean free = false; if (c == 0) { 當可重入次數為0時,當前線程就會釋放該鎖,即設置當前鎖的擁有者為null. free = true; setExclusiveOwnerThread(null); } setState(c); return free; }

然後?

AQS和reentrantlock先這樣大致講完了。還有些內容沒說,但我發覺這些內容需要其他基礎再來看比較好,不然就是一直在迷宮裡繞著。也不知道大家看懂沒有,思路可能極其混亂,建議還是視頻配上書看比較好。後面還會寫文章進行補充。只要把框架的大體架構記住,然後再綜合分析各個細節就比較容易看懂。需要的基礎比較多。。。


推薦閱讀:
查看原文 >>
相关文章