一個EOF引發的探索之路之三(golang鎖源碼探索篇)
前言:
一直使用鎖,大學的時候也在操作系統中學過鎖的概念以及實現的思路,但是從未真正的從代碼級別看過如何實現,趁著這個機會,看看。一、為何golang底層的socket的底層讀寫都要加鎖
當讀到netFD的源碼的時候,發現netFD的Read和Write函數都會加鎖,查看netFD的結構體,發現它加了一個fdmu fdMutex
看其類型名稱,就知道它是專門為文件描述符定製的鎖,下面以這個為切入點探索下各種鎖。
1.1 golang互斥鎖原理互斥鎖是我是在golang中最常用的鎖了,互斥鎖是在多協程的時候使用,在某一時刻只能有一個協程佔用資源。結合在網路上查閱的資料、golang鎖的源碼,現總結如下: golang 提供的互斥鎖為sync.Mutex,並且只提供了lock和unlock,我們就以這兩個函數為切入點。 先來看看Mutex的結構體type Mutex struct {
state int32 //互斥鎖上鎖狀態
sema uint32 //信號量,向處於Gwaitting的G發送信號
}
const (
mutexLocked = 1 << iota // 1 互斥鎖是鎖定的
mutexWoken // 2 喚醒鎖
mutexWaiterShift = iota // 2 統計阻塞在這個互斥鎖上的goroutine數目需要移位的數值
)
0位代表鎖的佔用狀態(1被佔用,0可用),1位代表當前協程是否被喚醒(1被喚醒,0sleep中),2~31位代表當前阻塞在Mutex的協程數量。
ok,先上Lock代碼+翻譯func (m *Mutex) Lock() {
//首先查看當前的state是否為0(未被佔用),如果為0,則把state的第0位置為1,表示佔用當前鎖,返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
//初始化喚醒標誌位為否,自旋鎖次數置為0
awoke := false
iter := 0
for {
//保存當前鎖狀態
old := m.state
//創建一個新的鎖狀態,把鎖佔用狀態設置為1
new := old | mutexLocked
//判斷當前鎖是否被佔用
if old&mutexLocked != 0 {
//是否可以進入自旋
if runtime_canSpin(iter) {
//條件:1.我們設置的喚醒標誌位為否
// 2.當前鎖的喚醒標誌位為否
// 3.還有協程阻塞在當前的鎖
// 4.嘗試設置當前鎖的喚醒標誌位為1(目的是用於通知佔用當前鎖的協程在釋放鎖的時候別再喚醒其他的協程了,稍後看下unlock的源碼就可以驗證)
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//把喚醒標誌位設為true
awoke = true
}
//開始自旋
runtime_doSpin()
//自旋次數加1
iter++
continue
}
//把鎖的等待個數加1
new = old + 1<<mutexWaiterShift
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
//清除喚醒標誌
new &^= mutexWoken
}
//嘗試更新當前鎖的狀態
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//如果當前鎖不在被佔用,結束循環
if old&mutexLocked == 0 {
break
}
//如果所還在被佔用,則把當前協程sleep,等待被喚醒
runtime_SemacquireMutex(&m.sema)
//當前協程被喚醒後,設置該協程的喚醒標誌位為true
awoke = true
iter = 0
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
- 我們看到代碼中有很多的位移操作,與、或這些都比較簡單,但是有一個「&^」的操作符,解釋下它的含義 此運算符是雙目運算符,按位計算,將運算符左邊數據相異的位保留,相同位清零。
- race這個玩意目前先忽略,與我們理解鎖沒有關係,這個是go做race檢測時候用的。
- 我們看到很多地方用到了atomic包的一些函數,atomic包是由golang提供的low-level的原子操作封裝,主要用來解決進程同步為題,官方並不建議直接使用。操作系統級的鎖的實現方案是提供原子操作,然後基本上所有鎖相關都是通過這些原子操作來實現。
- CompareAndSwapInt32(&addr, old, new)的含義是:如果*addr == old ,那麼*addr = new
- runtime_canSpin和runtime_doSpin分別是判斷是否可以自選與自旋操作 golang的互斥鎖雖然引用了自旋鎖,但是不會一直的自旋下去,只是會嘗試幾次,看下其源碼
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we dont do passive spinning here,
// because there can be work on global runq on on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
2).GOMAXPROCS>1
3).至少有一個運行的P並且local的P隊列為空(與協程的調度有關知識,稍後詳細介紹) 4).自旋次數不超過設定的閾值 那麼繼續看下runtime_doSpin幹了什麼func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
- 我們再來分析下runtime_SemacquireMutex幹了什麼
func runtime_SemacquireMutex(*uint32)
func sync_runtime_SemacquireMutex(addr *uint32) {
semacquire(addr, semaBlockProfile|semaMutexProfile)
}
func semacquire(addr *uint32, profile bool) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
if cansemacquire(addr) {
return
}
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
if profile && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
for {
lock(&root.lock)
atomic.Xadd(&root.nwait, 1)
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
root.queue(addr, s)
goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
if cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3)
}
releaseSudog(s)
}
1).cansemacquire這個函數的作用其實就是搶佔信號量,來看下源碼
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}
type semaRoot struct {
lock mutex
head *sudog
tail *sudog
nwait uint32 // Number of waiters. Read w/o the lock.
}
3).在for循環中我們看到一個atomic.Xadd(&root.nwait, -1),看到atomic我們就知道這個操作是系統級原子操作,它的作用就是對nwait做減1的操作。
4).接著我們來看goparkunlock,它的作用就厲害了,該函數是解開當前協程與實際的執行體的聯繫,使得當前協程休眠,釋放執行體,這個設計到goroutine的調度原理,稍後會詳細介紹。那麼這個函數什麼時候執行完畢呢?當該協程被喚醒的時候,一般該協程被喚醒也就是佔用該信號量的協程釋放資源的時候,這時它調用cansemacquire返回為true,否則繼續調用goparkunlock休眠,直到搶到信號量則break。總結下這個函數的作用,其實就是把首先要爭奪下信號量,如果爭奪不成功,就把自己放到爭奪信號量的隊列中,並且休眠,直到被喚醒為止。
ok,這幾個預備的知識終於描述完了,我們來總體看下互斥鎖的流程。由於裡面的邏輯比較繞,最好找幾個情況下的例子走一遍流程,也許會更好。 如果你看著上圖還是覺得饒,那麼我建議你結合unlock的流程來看,也許會好一些。先看下unlock的源碼func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema)
return
}
old = m.state
}
}
old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 {
...
}
if awoke {
...
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
break
}
runtime_SemacquireMutex(&m.sema)
awoke = true
iter = 0
}
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
1).當前的協程等待數量為0,則不用喚醒協程了。
2).當前協程被鎖,或者被喚醒了 第二個條件就可以跟Lock中的自旋時候的操作對應上,我們之前描述在lock中自旋的時候,會把鎖的喚醒標誌位設置為true,用於提醒佔用鎖的協程在釋放鎖的時候不要再喚醒其他協程了,就是我們的第二個條件。 4.首先更顯下new,等於old上的等待協程數量減1,然後在把喚醒標誌位設置true。接著有一個if判斷,if atomic.CompareAndSwapInt32(&m.state, old, new)這句話的含義是,判斷當前鎖的狀態在程序運行的這段時間沒有變化的話,就直接把鎖狀態更新為new,注意什麼情況下不成功呢?這就與第1不中的『小思考』聯繫上了, 如同小思考中說的,如果在這段時間鎖被直接搶佔了,if就會判斷為false,如果判斷為false,會直接 old = m.state 更新下old的狀態,這樣的話會繼續循環,再次到if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 01.2 golang的socket的加鎖原理下面進入本文的主題,講述對socket讀寫時候加鎖的源碼。 對socket讀寫加鎖的是先看下fdMutex結構體的組成,state指的是鎖的狀態信息,rsema和wsema分別指的是讀的信號量與寫的信號量。
fdMutex關於讀和寫的通過參數的方式進行區分,二者採用了相同的函數rwlock和rwunlock,參數為bool類型,true代表讀鎖操作,false代表寫鎖操作。 解釋下const中常量的作用,這些常量是用於對state變數進行未操作來讀取state中不同位代表的含義的值。 state是一個64位的變數ok,我們分別看下rwlock和rwunlock源碼func (mu *fdMutex) rwlock(read bool) bool {
var mutexBit, mutexWait, mutexMask uint64
var mutexSema *uint32
if read {
mutexBit = mutexRLock
mutexWait = mutexRWait
mutexMask = mutexRMask
mutexSema = &mu.rsema
} else {
mutexBit = mutexWLock
mutexWait = mutexWWait
mutexMask = mutexWMask
mutexSema = &mu.wsema
}
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexClosed != 0 {
return false
}
var new uint64
if old&mutexBit == 0 {
new = (old | mutexBit) + mutexRef
if new&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
} else {
new = old + mutexWait
if new&mutexMask == 0 {
panic("net: inconsistent fdMutex")
}
}
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
if old&mutexBit == 0 {
return true
}
runtime_Semacquire(mutexSema)
}
}
}
我們tcp的socket是全雙工的,鎖的這個特性也可以理解。
2).我們看for循環,首先讀取當前鎖的狀態存儲在old中,這裡有一個疑問,在存儲當前變數的時候,用了atomic.LoadUint64函數,我們知道這是一個原子,為什麼要用原子操作呢?這不就是一個對整型變數的賦值嗎?難道整型的賦值不是一個原子操作嗎?記得剛才我們看到的互斥鎖,在存儲當前鎖狀態的時候是直接賦值的。old := m.state這裡留一個個人疑問,後續確認原因後補充。 ok,繼續。接著判斷當前鎖是否已經被關閉了,如果被關閉直接返回false,加鎖失敗。 3).接著判斷鎖是否被佔用,如果未被佔用:把old佔用標誌位置為1,並且把當前鎖引用數+1存儲到new中,接著判斷new中的引用個數是否為0,如果為0就是一個異常了。這個判斷引起了我的一個思考: 什麼情況下會使得引用數為0呢,可能是異常的操作把其置為0,這是可以理解的。但是還有一個極端的情況,如果鎖的引用個數剛好為,你再加1,豈不是再去讀取引用個數就為0了,引用個數還是有一個極限的,當然,那些讀鎖等待數量與寫鎖等待數量同樣有這樣的問題。當然了,這種情況極少會發生,哪有那麼多的協程同時去操作一個鎖。 ok,繼續。如果判斷鎖已經被佔用:那麼就把old中的鎖等待數量加1然後放到new,接著同樣的道理檢測下new中的等待數量是否為0,如果為0則是異常情況。 4).接著就要修改當前鎖的狀態了,如果再第3)步中搶佔鎖成功,則會把鎖狀態改為搶佔成功的new,下面接著下面加一個判斷,old中的鎖是否被搶佔(這個等會有用),如果未被搶佔,則返回true,搶鎖成功;如果第3)步中搶佔鎖失敗,同樣把鎖狀態改為new(只是等待數量加1),下面的判斷就用到了,old的鎖在這時已經被搶佔的,代碼會往下走,調用runtime_Semacquire,看起來跟互斥鎖的runtime_SemacquireMutex很相似,作用應該差不多,把當前協程掛起,等待被喚醒。被喚醒後會再次循環,這個時候鎖的佔用狀態應該為false。 我們來看下調用runtime_Semacquire//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire(addr, semaBlockProfile)
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32) {
semacquire(addr, semaBlockProfile|semaMutexProfile)
}
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_Semacquire(&rw.readerSem)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem)
}
}
if race.Enabled {
race.Enable()
}
}
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Release(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem)
}
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
推薦閱讀: