sync.WaitGroup常規用法

通俗點說,兩個角色,一種goroutine作為一個worker(他是個小弟),老老實實幹活。另一種goroutine作為管理者督促小弟幹活(它自己也是個worker)。

在有很多小弟幹活時,管理者沒事幹歇著,但同時它又希望得到一個通知,知道小弟們什麼時候幹完活(所有小弟們一個不少全都幹完活了)。這樣管理者好對小弟的工作成果做驗收。

如果沒有sync.WaitGroup,怎麼實現?

其實也不難,從程序開發角度看,就是維護一個小弟總數和一個通道。每個小弟幹完活,就往通道發一個空消息,

管理者阻塞在通道的監聽上。來一個消息就說明有一個小弟幹完活了,記錄下有多少個消息,消息個數和小弟總數一致。就說明全乾活了,管理者關閉通道,驗收小弟工作成果。

寫成代碼就是這樣子

workers := 3
ch := make(chan struct{})
worker := func() {
// 幹活幹活幹活
ch <- struct{}{} // 通知管理者
}
leader := func() {
cnt := 0
for range ch {
cnt++
if cnt == workers {
break
}
}
close(ch)
// 檢查工作成果
}
go leader()
for i := 0; i < workers; i++ {
go worker()
}

改成sync.Waitgroup實現同樣的功能就成這樣子

wg := sync.WaitGroup{}
workers := 3
wg.Add(workers)
worker := func() {
defer wg.Done()
// 幹活幹活幹活
}
leader := func() {
wg.Wait()
// 檢查工作成果
}
go leader()
for i := 0; i < workers; i++ {
go worker()
}

Add,Done,Wait。三招完事。

語義很清晰。

知識點:sync.WaitGroup可以解決同步阻塞等待的問題。一個人等待一堆人幹完活的問題得到優雅解決。

到此為止就是sync.WaitGroup的常規用法了。舉一反三,可能還想到其它用法?文章最後一部分揭曉 :P

實現原理

根據語義猜測下,肯定是離不開阻塞喚醒機制和次數加減。而且是並發環境,那麼次數加減要CAS。最後還要記錄下阻塞的goroutine個數,因為要把挨個他們喚醒。

本文原理不多寫,簡單介紹下數據結構,再給出帶注釋的源碼,大家自行理解下。(如果看過《一份詳細注釋的go Mutex源碼》應該會很容易理解)

數據結構:

type WaitGroup struct {
noCopy noCopy
state1 [12]byte
sema uint32
}

如圖,除了state1其它沒什麼好說的。

state1是12位元組,但圖裡面只有8位元組。原因是32位編譯器的問題,在取state1時是做了特殊處理。

func (wg *WaitGroup) state() *uint64 {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)) // 32位系統
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[4])) // 64位系統
}
}

Add、Done和Wait注釋源碼

func (wg *WaitGroup) Add(delta int) {
statep := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) // 計數器
w := uint32(state) // 等待者個數。這裡用uint32,會直接截斷了高位32位,留下低32位
if v < 0 {
// Done的執行次數超出Add的數量
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
// 最開始時,Wait不能在Add之前被執行
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
// 計數器不為零,還有沒Done的。return
// 沒有等待者。return
return
}

// 所有goroutine都完成任務了,但有goroutine執行了Wait後被阻塞,需要喚醒它

if *statep != state {
// 已經到了喚醒階段了,就不能同時並發Add了
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 清零之後,就可以繼續Add和Done了
*statep = 0
for ; w != 0; w-- {
// 喚醒
runtime_Semrelease(&wg.sema, false)
}
}

func (wg *WaitGroup) Done() {
wg.Add(-1)
}

func (wg *WaitGroup) Wait() {
statep := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 計數器
w := uint32(state) // 等待者個數
if v == 0 {
// 如果聲明變數後,直接執行Wait也不會有問題
// 下面CAS操作失敗,重試,但剛好發現計數器變成零了,安全退出
return
}
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
race.Write(unsafe.Pointer(&wg.sema))
}
// 掛起當前的g
runtime_Semacquire(&wg.sema)
// 被喚醒後,計數器不應該大於0
// 大於0意味著Add的數量被Done完後,又開始了新一波Add
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}

舉一反三

前文說過常規用法是解決一個人等待一堆人幹完活的問題。

那反過來,一堆人等一個人幹完活呢?或者一堆人等另一堆人幹完活呢?

Add方法裏最後的for循環代碼告訴我們是可以的。

for ; w != 0; w-- {
// 喚醒全部被阻塞的goroutine
runtime_Semrelease(&wg.sema, false)
}

這樣子就有點意思了。sync.WaitGroup就有點像發布訂閱,只不過訂閱者收到的不是消息,而是一種事件信號。

singleflight就是這樣的例子。它解決了一堆人等一個人幹完活的問題。就比如現在有100個線程同時請求資料庫中同一行數。但只能有一個線程能讀庫,其他線程都阻塞等待它的結果。

源碼也是短小精悍。其實仔細看,在高並發的情況下,singleflight的保證是分批式的。因為它會delete操作,只要delete操作搶鎖成功,後來者們就組成新的一批,而這一批保證只有一個goroutine被執行。

使用singlefilght也有要注意的地方,fn的錯誤重試要自己處理;fn的耗時會成為別的goroutine最低耗時。

func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
// 一堆人都阻塞在這兒等一個人幹完活
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

c.val, c.err = fn()
c.wg.Done()

g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()

return c.val, c.err
}

所以,一堆人等另一堆人幹完活問題的思路也很簡單。就不介紹啦。

有更溜的用法也歡迎留言。


推薦閱讀:
相關文章