帶注釋的源碼在文章最後。。。

本文基於go 1.9.3。從go的Mutex源碼中看下互斥鎖的基本實現和考慮是怎樣的。有錯誤地方歡迎指正。

  1. CAS原子操作。
  2. 需要有一種阻塞和喚醒機制。
  3. 盡量減少阻塞和喚醒切換成本。
  4. 鎖盡量公平,後來者要排隊。即使被後來者插隊了,也要照顧先來者,不能有「飢餓」現象。

先看3,4點。再看2,1點。最後是源碼。

盡量減少阻塞和喚醒切換成本

減少切換成本的方法就是不切換,簡單而直接。

不切換的方式就是讓競爭者自旋。自旋一會兒,然後搶鎖。不成功就再自旋。到達上限次數才阻塞。

自旋就是CPU空轉一定的時鐘週期

不同平臺上自旋所用的指令不一樣。例如在amd64平臺下,彙編的實現如下

TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
// 自旋cycles次,每次自旋執行PAUSE指令
PAUSE
SUBL $1, AX
JNZ again
RET

是否允許自旋的判斷是嚴格的。而且最多自旋四次,每次30個CPU時鐘週期。

能不能自旋全由這個條件語句決定if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter)

翻譯下,就是下面的條件都滿足,才允許自旋。

  1. 鎖已被佔用,並且鎖不處於飢餓模式。
  2. 積累的自旋次數小於最大自旋次數(active_spin=4)。
  3. cpu核數大於1。
  4. 有空閑的P。
  5. 當前goroutine所掛載的P下,本地待運行隊列為空。

可以看到自旋要求嚴格,畢竟在鎖競爭激烈時,還無限制地自旋就肯定會影響其他goroutine。

const active_spin = 4
func sync_runtime_canSpin(i int) bool {
// 自旋次數不能大於 active_spin(4) 次
// cpu核數只有一個,不能自旋
// 沒有空閑的p了,不能自旋
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
// 當前g綁定的p裡面本地待運行隊列不為空,不能自旋
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}

鎖模式介紹

上面的出現了兩個常量,mutexStarving和mutexLocked。它們與鎖對象結構有關。比較基礎,這裡介紹一下。

type Mutex struct {
// [阻塞的goroutine個數, starving標識, woken標識, locked標識]
state int32
sema uint32
}

Mutex結構簡單的就只有兩個成員變數。sema是信號量,下文會介紹到。這裡主要介紹state的結構。

一個32位的變數,被劃分成上圖的樣子。右邊的標識也有對應的常量

const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
)

含義如下:

  • mutexLocked對應右邊低位第一個bit。值為1,表示鎖被佔用。值為0,表示鎖未被佔用。
  • mutexWoken對應右邊低位第二個bit。值為1,表示打上喚醒標記。值為0,表示沒有喚醒標記。
  • mutexStarving對應右邊低位第三個bit。值為1,表示鎖處於飢餓模式。值為0,表示鎖存於正常模式。
  • mutexWaiterShift是偏移量。它值為3。用法是state>>=mutexWaiterShift之後,state的值就表示當前阻塞等待鎖的goroutine個數。最多可以阻塞2^29個goroutine。

Mutex鎖分為兩種模式,正常模式飢餓模式

正常模式下,對於新來的goroutine而言,它有兩種選擇,要麼搶到了鎖,直接執行;要麼搶不到鎖,追加到阻塞隊列尾部,等待被喚醒的。

飢餓模式下,對於新來的goroutine,它只有一個選擇,就是追加到阻塞隊列尾部,等待被喚醒的。而且在該模式下,所有鎖競爭者都不能自旋。

除了這兩種模式。還有一個Woken(喚醒標記)。它主要用於自旋狀態的通知鎖公平性的保證。分兩個角度理解:

一、新的goroutine申請鎖時,發現鎖被佔用了。但自己滿足自旋條件,於是自己自旋,並設置上的Woken標記。此時佔用鎖的goroutine在釋放鎖時,檢查Woken標記,如果被標記。哪怕現在鎖上面的阻塞隊列不為空,也不做喚醒。直接return,讓自旋著的goroutine有更大機會搶到鎖。

if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}

二、釋放鎖時,檢查Woken標記為空。而阻塞隊列裏有goroutine需要被喚醒。那麼在喚醒時,同時標記鎖Woken。這裡可能有疑問,原來沒有Woken標記,為什麼在喚醒一個goroutine要主動標記呢?目的是保證鎖公平。

考慮這樣的場景:現在阻塞隊列裏只有一個goroutine。把它喚醒後,還得等調度器運行到它,它自己再去搶鎖。但在調度器運行到它之前,很可能新的競爭者參與進來,此時鎖被搶走的概率就很大。

這有失公平,被阻塞的goroutine是先到者,新的競爭者是後來者。應該盡量讓它們一起競爭。

// 喚醒一個阻塞的goroutine,並把鎖的Woken標記設置上
new = (old - 1<<mutexWaiterShift) | mutexWoken

設置Woken標記後,state就肯定不為零。此時新來的競爭者,在執行Lock()的fast-path時會失敗,接下來就只能乖乖排隊了。

func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// Woken標記設置後,這裡的CAS就會為false
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
// ...
return
}
// 接下來在阻塞裏排隊
}

小總結:為了減少切換成本,短暫的自旋等待是簡單的方法。而競爭者在自旋時,要主動設置Woken標記。這樣釋放者才能感知到。

鎖盡量公平

為什麼不是絕對公平?要絕對公平的粗暴做法就是在鎖被佔用後,其它所有競爭者,包括新來的,全部排隊。

但排隊的問題也很明顯,排隊阻塞喚醒的切換成本(這是損耗性能的潛在的隱患,下面Mutex的問題有舉例)。假如臨界區代碼執行只需要十幾個時鐘週期時,讓競爭者自旋等待一下,立刻就可以獲得鎖。減少不必要的切換成本,效率更高。

盡量公平的結果就是阻塞的競爭者被喚醒後,也要與(正在自旋的)新競爭者搶奪鎖資源。

go使用三種手段保證Mutex鎖盡量公平:

  1. 上面介紹的,在鎖釋放時,主動設置Woken標記,防止新的競爭者輕易搶到鎖。
  2. 競爭者進阻塞隊列策略不一樣。新的競爭者,搶不到鎖,就排在隊列尾部。先來競爭者,從隊列中被喚醒後,還是搶不到鎖,就放在隊列頭部。
  3. 任何競爭者,被阻塞等待的時間超過指定閥值(1ms)。鎖就轉為飢餓模式。這時鎖釋放時會喚醒它們,手遞手式把鎖資源給它們。別的競爭者(包括新來的)都搶不到。直到把飢餓問題解決掉。

飢餓問題是會積壓的。要儘快解決。舉個例子解釋一下:

藍色是新競爭者,紅色是阻塞等待時間超過閥值的競爭者。每次持鎖時間是0.3ms。

只要有競爭者阻塞超時了,鎖就會轉換為飢餓模式。飢餓模式下,所有的新競爭者都得排隊。

圖中時刻4中的G3就是被積壓的。如果時刻0中的競爭者更多時,並且搶鎖順序不變。那麼時刻4的積壓就更嚴重。

同時反映出一個問題。

Mutex帶來的問題

假設在業務某個場景中,對每個請求都需要訪問某互斥資源。使用Mutex鎖時,如果QPS很高,阻塞隊列肯定會很滿。雖然QPS可能會降,但請求是持續的。

新來的請求,在訪問互斥資源時有可能搶鎖成功,後來者勝於先到者。這種情況持續發生的話,就會導致阻塞隊列中所有的請求得不到處理,耗時增高,直至超出上游設置的超時時間,一下子失敗率突增,上游再影響它的上游,引起連鎖反應進而服務故障異常。

解決方案要根據實際業務場景來優化。削減鎖的粒度;或者使用CAS的方式進隊列,然後阻塞在通道上;或者使用無鎖結構等等。

阻塞在通道而不是阻塞的鎖上,是因為go的runtime對待鎖喚醒和通道喚醒goroutine的效率是不一樣的。這也引出了還有一種方案是改runtime,讓鎖喚醒的goroutine更快地得到執行。畢竟上面問題點是被喚醒的goroutine和新的goroutine在競爭中不能保證穩勝,被喚醒的goroutine會有一個調度耗時,減少耗時就有可能提高競爭成功率。

阻塞和喚醒機制

go的阻塞和喚醒是semacquire和semrelease。雖然命名上是sema,但實際用途卻是一套阻塞喚醒機制。

// That is, dont think of these as semaphores.
// Think of them as a way to implement sleep and wakeup

其實這個阻塞和喚醒機制,完全可以另寫一篇。不過配合Mutex鎖的理解這兒就先簡單介紹下。

go的runtime有一個全局變數semtable,它放置了所有的信號量。

var semtable [semTabSize]struct {
root semaRoot
pad [sys.CacheLineSize - unsafe.Sizeof(semaRoot{})]byte
}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags)
func semrelease1(addr *uint32, handoff bool)

每個信號量都由一個變數地址指定。Mutex就是用成員sema的地址。

在阻塞時,調用semacquire1,把地址(addr)傳給它。

如果addr大於1,並且通過CAS減一成功,那就說明獲取信號量成功。不用阻塞。

否則,semacquire1會在semtable數組中找一個元素和它對應上。每個元素都有一個root,這個root是Treap樹(ACM同學應該熟悉)。

最後addr變成一個樹節點,這個樹節點,有自己的一個隊列,專門放被阻塞的goroutine。叫它阻塞隊列吧。

這個阻塞隊列是個雙端隊列,頭尾都可以進。

semacquire1把當前goroutine相關元數據放進阻塞隊列之後,就掛起了。

semrelease1是給addr CAS加一。

如果堅持發現當前addr上有阻塞的goroutine時,就取一個出來,喚醒它,讓它自己再去semacquire1。這是handoff為false的情況。

但handoff為true的話,就嘗試手遞手地把信號量送給這個goroutine。等於說goroutine不用再自己去搶了,因為自己再去搶有可能搶不到。

最後semrelease1會把取出來的這個goroutine掛在當前P的本地待運行隊列尾部,等待調度執行。

就是這樣,在獲取不到Mutex鎖時,通過信號量來阻塞和喚醒goroutine。

CAS原子操作

CAS就是基本的原子操作。沒什麼好說的。

例如在amd64上,go的彙編實現:

TEXT ·CompareAndSwapUint32(SB),NOSPLIT,$0-17
MOVV addr+0(FP), R1
MOVW old+8(FP), R2
MOVW new+12(FP), R5
SYNC
cas_again:
MOVV R5, R3
LL (R1), R4
BNE R2, R4, cas_fail
SC R3, (R1)
BEQ R3, cas_again
MOVV $1, R1
MOVB R1, swapped+16(FP)
SYNC
RET
cas_fail:
MOVV $0, R1
JMP -4(PC)

源碼

type Mutex struct {
// [阻塞的goroutine個數, starving標識, woken標識, locked標識]
// [0~28, 1, 1, 1]
state int32
sema uint32
}

const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken // 喚醒標記
mutexStarving // 飢餓模式
mutexWaiterShift = iota // 位移數

starvationThresholdNs = 1e6 // 阻塞時間閥值1ms
)

func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 嘗試CAS上鎖
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
// 上鎖成功,直接返回
return
}

var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {

// 進入到這個循環的,有兩種角色goroutine
// 一種是新來的goroutine。另一種是被喚醒的goroutine。所以它們可能在這個地方再一起競爭鎖
// 如果新來的goroutine搶成功了,那另一個只能再阻塞著等待。但超過1ms後,鎖會轉換成飢餓模式
// 在這個模式下,所有新來的goroutine必須排在隊伍的後面。沒有搶鎖資格

// 飢餓模式下,不能自旋
// 鎖被佔用了,不能自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// woken位沒有被設置;被阻塞等待goroutine的個數大於0
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 可以自旋了,那就設置上woken位,在unlock時,如果發現有別的goroutine在自旋,就立即返回,有被阻塞的goroutine也不喚醒了
awoke = true
}
// runtime_doSpin -> sync_runtime_doSpin
// 每次自旋30個時鐘週期,最多120個週期
runtime_doSpin()
iter++
old = m.state
continue
}

// 自旋完了還是等不到鎖 或 可以上鎖

new := old
// 飢餓模式下的鎖不搶
if old&mutexStarving == 0 {
// 非飢餓模式下,可以搶鎖
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
// 已經被上鎖了,或鎖處於飢餓模式下,就阻塞當前的goroutine
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
// 當前的goroutine已經被餓著了,所以要把鎖設置為飢餓模式
new |= mutexStarving
}
if awoke {
// 當前的goroutine有自旋過,但現在已經自旋結束了。所以要取消woken模式
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
// 取消woken標誌
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
// 成功上鎖
break // locked the mutex with CAS
}

// 主要是為了和第一次調用的Lock的g劃分不同的優先順序
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 使用信號量阻塞當前的g
// 如果當前g已經阻塞等待過一次了,queueLifo被賦值true
runtime_SemacquireMutex(&m.sema, queueLifo)
// 判斷當前g是否被餓著了
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// 飢餓模式下,被手遞手喂信號量喚醒的
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
panic("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift) // -7(111)
if !starving || old>>mutexWaiterShift == 1 {
// 退出飢餓模式
// 飢餓模式會影響自旋
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// 不是手遞手的信號量,那就自己繼續競爭鎖
// 必須設置為true,這樣新一輪的CAS之前,就可以取消woken模式。
// 因為通過信號量釋放鎖時,為了保持公平性,會同時設置woken模式。
awoke = true
iter = 0
} else {
old = m.state
}
}

if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}

func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}

// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
// 不能多次執行unclock()
panic("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
// 非飢餓模式
old := new
for {
// 沒有被阻塞的goroutine。直接返回
// 有阻塞的goroutine,但處於woken模式,直接返回
// 有阻塞的goroutine,但被上鎖了。可能發生在此for循環內,第一次CAS不成功。因為CAS前可能被新的goroutine搶到鎖。直接返回
// 有阻塞的goroutine,但鎖處於飢餓模式。可能發生在被阻塞的goroutine不是被喚醒調度的,而是被正常調度運行的。直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}

// 有阻塞的goroutine,喚醒一個或變為沒有阻塞的goroutine了就退出
// 這個被喚醒的goroutine還需要跟新來的goroutine競爭
// 如果只剩最後一個被阻塞的goroutine。喚醒它之後,state就變成0。
// 如果此刻來一個新的goroutine搶鎖,它有可能在goroutine被重新調度之前搶鎖成功。
// 這樣就失去公平性了,不能讓它那麼幹,所以這裡也要設置為woken模式。
// 因為Lock方法開始的fast path,CAS操作的old值是0。這裡設置woken模式成功後,後來者就只能乖乖排隊。保持了鎖的公平性
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// 飢餓模式
// 手遞手喚醒一個goroutine
runtime_Semrelease(&m.sema, true)
}
}

推薦閱讀:

相關文章