前言:

一直使用鎖,大學的時候也在操作系統中學過鎖的概念以及實現的思路,但是從未真正的從代碼級別看過如何實現,趁著這個機會,看看。

一、為何golang底層的socket的底層讀寫都要加鎖

當讀到netFD的源碼的時候,發現netFD的Read和Write函數都會加鎖,查看netFD的結構體,發現它加了一個

fdmu fdMutex

看其類型名稱,就知道它是專門為文件描述符定製的鎖,下面以這個為切入點探索下各種鎖。

1.1 golang互斥鎖原理互斥鎖是我是在golang中最常用的鎖了,互斥鎖是在多協程的時候使用,在某一時刻只能有一個協程佔用資源。結合在網路上查閱的資料、golang鎖的源碼,現總結如下: golang 提供的互斥鎖為sync.Mutex,並且只提供了lock和unlock,我們就以這兩個函數為切入點。 先來看看Mutex的結構體

type Mutex struct {
state int32 //互斥鎖上鎖狀態
sema uint32 //信號量,向處於Gwaitting的G發送信號
}

const (
mutexLocked = 1 << iota // 1 互斥鎖是鎖定的
mutexWoken // 2 喚醒鎖
mutexWaiterShift = iota // 2 統計阻塞在這個互斥鎖上的goroutine數目需要移位的數值
)

注意: golang的const設置常亮有一個小的知識點,iota的含義是在const中第幾行的意思,從第0行開始,比如mutexLocked = 1<<iota 最終等於1,mutexWoken為什麼等於2呢?因為const如果不顯示的賦值給變數,變數會複製上一行的賦值語句,也是1<<iota,但是它是在第一行,所以的應該是1<<1即為2,第三行的mutexWaiterShift的值直接等於iota,也就是2了。我們看到Mutex的採用的是信號量的機制,解釋下mutexLocked,mutexWoken,mutexWaiterShift三個變數作為一定恆定值,用於對state進行位移操作來修改或者讀取對應位的值。 state表示Mutex的狀態,是一個32位int類型,其不同位代表含義不同,

0位代表鎖的佔用狀態(1被佔用,0可用),1位代表當前協程是否被喚醒(1被喚醒,0sleep中),2~31位代表當前阻塞在Mutex的協程數量。

ok,先上Lock代碼+翻譯

func (m *Mutex) Lock() {
//首先查看當前的state是否為0(未被佔用),如果為0,則把state的第0位置為1,表示佔用當前鎖,返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
//初始化喚醒標誌位為否,自旋鎖次數置為0
awoke := false
iter := 0
for {
//保存當前鎖狀態
old := m.state
//創建一個新的鎖狀態,把鎖佔用狀態設置為1
new := old | mutexLocked
//判斷當前鎖是否被佔用
if old&mutexLocked != 0 {
//是否可以進入自旋
if runtime_canSpin(iter) {
//條件:1.我們設置的喚醒標誌位為否
// 2.當前鎖的喚醒標誌位為否
// 3.還有協程阻塞在當前的鎖
// 4.嘗試設置當前鎖的喚醒標誌位為1(目的是用於通知佔用當前鎖的協程在釋放鎖的時候別再喚醒其他的協程了,稍後看下unlock的源碼就可以驗證)
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//把喚醒標誌位設為true
awoke = true
}
//開始自旋
runtime_doSpin()
//自旋次數加1
iter++
continue
}
//把鎖的等待個數加1
new = old + 1<<mutexWaiterShift
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
//清除喚醒標誌
new &^= mutexWoken
}
//嘗試更新當前鎖的狀態
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//如果當前鎖不在被佔用,結束循環
if old&mutexLocked == 0 {
break
}
//如果所還在被佔用,則把當前協程sleep,等待被喚醒
runtime_SemacquireMutex(&m.sema)
//當前協程被喚醒後,設置該協程的喚醒標誌位為true
awoke = true
iter = 0
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}

理解這段代碼也有幾個小知識先註明下:
  1. 我們看到代碼中有很多的位移操作,與、或這些都比較簡單,但是有一個「&^」的操作符,解釋下它的含義 此運算符是雙目運算符,按位計算,將運算符左邊數據相異的位保留,相同位清零。
  2. race這個玩意目前先忽略,與我們理解鎖沒有關係,這個是go做race檢測時候用的。
  3. 我們看到很多地方用到了atomic包的一些函數,atomic包是由golang提供的low-level的原子操作封裝,主要用來解決進程同步為題,官方並不建議直接使用。操作系統級的鎖的實現方案是提供原子操作,然後基本上所有鎖相關都是通過這些原子操作來實現。
  4. CompareAndSwapInt32(&addr, old, new)的含義是:如果*addr == old ,那麼*addr = new
  5. runtime_canSpin和runtime_doSpin分別是判斷是否可以自選與自旋操作 golang的互斥鎖雖然引用了自旋鎖,但是不會一直的自旋下去,只是會嘗試幾次,看下其源碼

func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we dont do passive spinning here,
// because there can be work on global runq on on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}

限制條件: 1).多核(如果是單核cpu沒必要這麼空耗)

2).GOMAXPROCS>1

3).至少有一個運行的P並且local的P隊列為空(與協程的調度有關知識,稍後詳細介紹) 4).自旋次數不超過設定的閾值 那麼繼續看下runtime_doSpin幹了什麼

func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}

查閱資料結果:procyield函數是彙編語言實現。函數內部循環調用PAUSE指令。PAUSE指令什麼都不做,但是會消耗CPU時間,在執行PAUSE指令時,CPU不會對它做不必要的優化。 總結下,所謂自旋就是一直循環等待了,只是golang的互斥鎖並不會一直循環等待,只會在特定的條件下嘗試幾次。自旋的操作無非就是空耗CPU了。
  1. 我們再來分析下runtime_SemacquireMutex幹了什麼

func runtime_SemacquireMutex(*uint32)
func sync_runtime_SemacquireMutex(addr *uint32) {
semacquire(addr, semaBlockProfile|semaMutexProfile)
}
func semacquire(addr *uint32, profile bool) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
if cansemacquire(addr) {
return
}
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
if profile && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
for {
lock(&root.lock)
atomic.Xadd(&root.nwait, 1)
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
root.queue(addr, s)
goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
if cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3)
}
releaseSudog(s)
}

我只把我理解的部分說下

1).cansemacquire這個函數的作用其實就是搶佔信號量,來看下源碼

func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}

注意下,其實我們的互斥鎖的信號量最大也就是1 2).semroot(addr)它返回的是一個semroot類型的變數,看下這個結構體的聲明

type semaRoot struct {
lock mutex
head *sudog
tail *sudog
nwait uint32 // Number of waiters. Read w/o the lock.
}

解釋下,這個結構體其實就是維護信號量與爭奪信號量的協程的信息。結構體中維護了一個鏈表,用於保存等待信號量的協程信息,head和tail就是用於對該鏈表操作的入口,代碼中root.queue(addr, s) 實際上就是把當前的協程入鏈表中。nwait欄位代表的就是當前等待信號量的協程數量。 semaRoot結構體中還有一個mutex類型的鎖,注意這跟我們Mutex並非是同一中類型,這是golang內部使用的類型鎖,在這裡是對鏈表的操作加鎖,對於該鎖的詳細的源碼不再描述,感興趣同學可以看源碼。

3).在for循環中我們看到一個atomic.Xadd(&root.nwait, -1),看到atomic我們就知道這個操作是系統級原子操作,它的作用就是對nwait做減1的操作。

4).接著我們來看goparkunlock,它的作用就厲害了,該函數是解開當前協程與實際的執行體的聯繫,使得當前協程休眠,釋放執行體,這個設計到goroutine的調度原理,稍後會詳細介紹。那麼這個函數什麼時候執行完畢呢?當該協程被喚醒的時候,一般該協程被喚醒也就是佔用該信號量的協程釋放資源的時候,這時它調用cansemacquire返回為true,否則繼續調用goparkunlock休眠,直到搶到信號量則break。

總結下這個函數的作用,其實就是把首先要爭奪下信號量,如果爭奪不成功,就把自己放到爭奪信號量的隊列中,並且休眠,直到被喚醒為止。

ok,這幾個預備的知識終於描述完了,我們來總體看下互斥鎖的流程。

由於裡面的邏輯比較繞,最好找幾個情況下的例子走一遍流程,也許會更好。 如果你看著上圖還是覺得饒,那麼我建議你結合unlock的流程來看,也許會好一些。先看下unlock的源碼

func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}

old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema)
return
}
old = m.state
}
}

unlock看起來就舒服很多了,ok我們來梳理下,當一個協程要釋放它鎖佔有的互斥鎖時: 1.首先它會把當前鎖的狀態值減1,在釋放前一般該鎖的最後一位為1,減1就是把佔用狀態設置為0(原子操作),然後把設置後的狀態保存下來。 小思考:我們來設想下,假如剛把當前鎖佔用狀態設置為0,剛好有另外一個協程進入lock的for循環,我們回過頭去看下Lock的源碼,

old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 {
...
}
if awoke {
...
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
break
}
runtime_SemacquireMutex(&m.sema)
awoke = true
iter = 0
}

前兩個if沒進去,直接看第三個if,atomic.CompareAndSwapInt32(&m.state, old, new)執行成功後,搶佔鎖成功,if old&mutexLocked == 0 判斷為true,則直接跳出循環。 看來鎖就直接被另外的這個協程搶佔了,那麼會不會影響unlock的執行呢?實際是不會的(看第4步的解釋),我們繼續往下看。 2.接著,會判斷 if (new+mutexLocked)&mutexLocked==0這句話的含義其實就在 判斷剛進入unlock的時候,鎖的狀態是否為非佔用狀態,如果所未被佔用,而又調用unlock,是不允許的,需要拋異常 3.用old copy下new存儲下來,接著就進入了循環: 來看第一個判斷,

if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}

如果這個條件成立,直接返回,不用往下走了(往下走是喚醒等待信號量的協程),來看下這兩個或關係的條件:

1).當前的協程等待數量為0,則不用喚醒協程了。

2).當前協程被鎖,或者被喚醒了 第二個條件就可以跟Lock中的自旋時候的操作對應上,我們之前描述在lock中自旋的時候,會把鎖的喚醒標誌位設置為true,用於提醒佔用鎖的協程在釋放鎖的時候不要再喚醒其他協程了,就是我們的第二個條件。 4.首先更顯下new,等於old上的等待協程數量減1,然後在把喚醒標誌位設置true。接著有一個if判斷,if atomic.CompareAndSwapInt32(&m.state, old, new)這句話的含義是,判斷當前鎖的狀態在程序運行的這段時間沒有變化的話,就直接把鎖狀態更新為new,注意什麼情況下不成功呢?這就與第1不中的『小思考』聯繫上了, 如同小思考中說的,如果在這段時間鎖被直接搶佔了,if就會判斷為false,如果判斷為false,會直接 old = m.state 更新下old的狀態,這樣的話會繼續循環,再次到if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 01.2 golang的socket的加鎖原理下面進入本文的主題,講述對socket讀寫時候加鎖的源碼。 對socket讀寫加鎖的是

先看下fdMutex結構體的組成,state指的是鎖的狀態信息,rsema和wsema分別指的是讀的信號量與寫的信號量。

fdMutex關於讀和寫的通過參數的方式進行區分,二者採用了相同的函數rwlock和rwunlock,參數為bool類型,true代表讀鎖操作,false代表寫鎖操作。 解釋下const中常量的作用,這些常量是用於對state變數進行未操作來讀取state中不同位代表的含義的值。 state是一個64位的變數

ok,我們分別看下rwlock和rwunlock源碼

func (mu *fdMutex) rwlock(read bool) bool {
var mutexBit, mutexWait, mutexMask uint64
var mutexSema *uint32
if read {
mutexBit = mutexRLock
mutexWait = mutexRWait
mutexMask = mutexRMask
mutexSema = &mu.rsema
} else {
mutexBit = mutexWLock
mutexWait = mutexWWait
mutexMask = mutexWMask
mutexSema = &mu.wsema
}
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexClosed != 0 {
return false
}
var new uint64
if old&mutexBit == 0 {
new = (old | mutexBit) + mutexRef
if new&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
} else {
new = old + mutexWait
if new&mutexMask == 0 {
panic("net: inconsistent fdMutex")
}
}
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
if old&mutexBit == 0 {
return true
}
runtime_Semacquire(mutexSema)
}
}
}

解讀下過程: 1).首先判斷是否為讀加鎖,這個地方我們能看出對socket的讀寫鎖操作沒什麼區別,而且上面我們也看到讀寫鎖除了在引用數量的地方有關聯,其他地方二者使用的是state不同的位,也就是說讀和寫操作一般情況下是沒有關係的,不會互斥。

我們tcp的socket是全雙工的,鎖的這個特性也可以理解。

2).我們看for循環,首先讀取當前鎖的狀態存儲在old中,這裡有一個疑問,在存儲當前變數的時候,用了atomic.LoadUint64函數,我們知道這是一個原子,為什麼要用原子操作呢?這不就是一個對整型變數的賦值嗎?難道整型的賦值不是一個原子操作嗎?記得剛才我們看到的互斥鎖,在存儲當前鎖狀態的時候是直接賦值的。old := m.state這裡留一個個人疑問,後續確認原因後補充。 ok,繼續。接著判斷當前鎖是否已經被關閉了,如果被關閉直接返回false,加鎖失敗。 3).接著判斷鎖是否被佔用,如果未被佔用:把old佔用標誌位置為1,並且把當前鎖引用數+1存儲到new中,接著判斷new中的引用個數是否為0,如果為0就是一個異常了。這個判斷引起了我的一個思考: 什麼情況下會使得引用數為0呢,可能是異常的操作把其置為0,這是可以理解的。但是還有一個極端的情況,如果鎖的引用個數剛好為,你再加1,豈不是再去讀取引用個數就為0了,引用個數還是有一個極限的,當然,那些讀鎖等待數量與寫鎖等待數量同樣有這樣的問題。當然了,這種情況極少會發生,哪有那麼多的協程同時去操作一個鎖。 ok,繼續。如果判斷鎖已經被佔用:那麼就把old中的鎖等待數量加1然後放到new,接著同樣的道理檢測下new中的等待數量是否為0,如果為0則是異常情況。 4).接著就要修改當前鎖的狀態了,如果再第3)步中搶佔鎖成功,則會把鎖狀態改為搶佔成功的new,下面接著下面加一個判斷,old中的鎖是否被搶佔(這個等會有用),如果未被搶佔,則返回true,搶鎖成功;如果第3)步中搶佔鎖失敗,同樣把鎖狀態改為new(只是等待數量加1),下面的判斷就用到了,old的鎖在這時已經被搶佔的,代碼會往下走,調用runtime_Semacquire,看起來跟互斥鎖的runtime_SemacquireMutex很相似,作用應該差不多,把當前協程掛起,等待被喚醒。被喚醒後會再次循環,這個時候鎖的佔用狀態應該為false。 我們來看下調用runtime_Semacquire

//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire(addr, semaBlockProfile)
}

好像他跟之前的互斥鎖最終調用的是一個函數,只是傳的第二個參數不同,我們看下互斥鎖是怎麼調用的

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32) {
semacquire(addr, semaBlockProfile|semaMutexProfile)
}

1.3 自旋鎖原理其實自旋鎖我們再闡述goilang的互斥鎖的時候,已經說的差不多,這裡直接總結下即可: 二者都是使用信號量來完成實現,只是互斥鎖會在鎖被佔用的情況下,進行block,然後等待被喚醒這需要CPU調度的,而自旋鎖則不會block,一直進行空耗CPU忙等,直到鎖被釋放,好處在於鎖被釋放時不需要CPU,立刻就能感知。但是自旋鎖不適合於單核CPU,因為本協程空耗CPU會影響佔用鎖的協程或者線程的處理。1.4 golang讀寫鎖原理讀寫所的原理我們應該都了解:讀與讀之間無影響,讀與寫之間互斥,寫與寫之間互斥。 golang實現讀寫鎖,一共有四個函數。RLock和RUnlock是讀操作,Lock和Unlock是寫操作。 下面上源碼:

type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
const rwmutexMaxReaders = 1 << 30

先看下結構體,首先看到讀寫鎖很明顯的需要用到互斥鎖Mutex,接著分別是寫和讀信號量,最後是當前讀的個數與當前等待讀的個數,注意這個等待個數是針對寫來說的,並非針對讀。 最後一個是rwmutexMaxReaders最大的同時讀的數量,稍後看源碼的時候就知道它的用處了。 先看讀的lock:

func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_Semacquire(&rw.readerSem)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}

先看下讀的lock操作,首先嘗試把當前鎖的讀的個數加1,注意看後判斷下加1後的讀的個數是否小於0,為什麼會小於0呢?其實小於0的原因是當前有寫操作或者等待的寫操作,稍後我們看寫的操作的時候就會知道。如果小於0,就調用runtime_Semacquire(&rw.readerSem),哈!這個函數我們再熟悉不過了,就是把當前的協程sleep,等待被喚醒,注意傳進去的事讀的信號量喔。如果大於等0,那就是沒有寫,直接return即可。 接著看讀unlock:

func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem)
}
}
if race.Enabled {
race.Enable()
}
}

RUnlock過程稍微有點複雜: 1).首先把當前鎖的讀的個數減1,然後有一個判斷減1後是否小於0,小於我們知道是當前有寫或者等待寫的協程(實際上不會有正在寫的情況,因為寫操作會等待讀操作完畢後再寫,當前既然是讀的unlock,那麼就代表之前有讀,自然不會有正在寫),如果大於等於0直接返回即可。 2).繼續,如果小於0後進入if,首先會有一個判斷,如果為true,則報異常if r+1 == 0 || r+1 == -rwmutexMaxReadersr+1==0是什麼意思,就是說在上一步減1之前當前讀的個數為0,沒有讀操作而unlock,這肯定是異常了; r+1==-rwmutexMaxReaders我們放在後面解釋寫操作的時候解釋。 3).接著看,把當前鎖的等待讀的個數減1,然後判斷是否為0,我們在第1)步中已經解釋了,進入這個if代表當前有等待寫的協程,如果等待讀的個數為0就代表沒有等待讀的協程,可以進行寫了,所以代碼中調用了runtime_Semrelease(&rw.writerSem),注意這裡操作的是寫的信號量喔。 ok,我們繼續看寫操作的Lock

func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}

1).首先調用rw.w.Lock(),這個互斥鎖終於用到了,看起來這個互斥鎖只有在寫與寫之間使用。 2).接著,把鎖的讀的個數減去rwmutexMaxReaders,rwmutexMaxReaders這是一個極大的值,當前讀的個數減去它之後肯定為負了,這也正好解釋了我們在看讀的操作的時候為什麼要判斷當前的讀的個數是否小於0的原因了,讀寫之間就是用這樣的形式進行通信,然後把減去之後的值再加回來賦值r,r就是當前的讀操作的個數。 我們再回頭看看第2)步遺留的問題,r+1 == -rwmutexMaxReaders代表,在調用unlock之前readerCount為-rwmutexMaxReaders,它代表的含義是只有寫的操作,而沒有正在讀的操作,所以對讀的unlock是異常的。但是這有一個小問題,如果存在以下的情景: 剛開始沒有讀的操作,先過來一個寫的加鎖操作,readerCount==-rwmutexMaxReaders,然後緊接著有一個協程A進行讀,調用atomic.AddInt32(&rw.readerCount, 1),會使得readerCount==-rwmutexMaxReaders+1,然後阻塞,緊接著一個協程B對鎖不進行讀lock,直接對讀進行unlock,r+1 == -rwmutexMaxReaders+1,就會導致unlock成功。這是一個值得注意的地方,還好,golang有race競爭檢測用於幫助開發人員檢測出代碼的問題。我們看到這些鎖的代碼中都有race的影子。 3).然後進行如下一個判斷,如果為true則調用runtime_Semacquire(&rw.writerSem),釋放一個寫的信號量,喚醒一個等待寫的協程。if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0r != 0好理解,當前有讀的操作,這是一個&&操作,r!=0滿足後,才會進行atomic.AddInt32(&rw.readerWait, r),把當前讀鎖的個數加到等待讀的個數中,為什麼還要判斷加之後的值是否為0呢?看起來像是一定大於0呀,其實不是的,還是有可能等於0的。該情景如下: 有一個協程A調用讀操作,這時readerCount==1,讀完後調用RUnlock,代碼剛進入函數還沒怎麼走,這時,剛好有一個協程B進行讀操作,剛好走完下面這行代碼r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders此時readerCount==-rwmutexMaxReaders+1 然後A協程的RUnlock繼續往下走,剛好走完如下這行代碼(真XX巧,哈哈)if atomic.AddInt32(&rw.readerWait, -1) == 0這時,你會發現readerWait==-1,然後呢,協程B不甘示弱,要走這行我們關注的代碼了if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0此時r==1,readerWait==-1,相加是不是等於0了,這代表什麼,唯一的讀操作完畢了,寫操作你不用阻塞了,直接lock成功,好神奇。 ok,我們繼續看寫操作的Unlock源碼

func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Release(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem)
}
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}

1).首先把之前減的rwmutexMaxReaders加回來,這是readerCount就是當前讀的個數,注意這個是有在寫操作的這段時間,新加入的讀的個數,這些讀操作都陷入了阻塞。把加之後的readerCount賦值給r。 2).接下來會有一個異常判斷if r >= rwmutexMaxReaders什麼情況下為true,當然是之前我沒有lock過,readerCount沒減過rwmutexMaxReaders,你加了之後會大於它。 3).進入一個循環,這些因為寫而陷入阻塞的讀操作全部喚醒 4).然後釋放互斥鎖 至此,所有的鎖的源碼,我們都閱讀完畢。實際上我們還沒解答為什麼golang的socket為什麼要加鎖呢?這個我們再稍後會解釋。
推薦閱讀:
相关文章