你真的會用sync.WaitGroup嗎
sync.WaitGroup常規用法
通俗點說,兩個角色,一種goroutine作為一個worker(他是個小弟),老老實實幹活。另一種goroutine作為管理者督促小弟幹活(它自己也是個worker)。
在有很多小弟幹活時,管理者沒事幹歇著,但同時它又希望得到一個通知,知道小弟們什麼時候幹完活(所有小弟們一個不少全都幹完活了)。這樣管理者好對小弟的工作成果做驗收。
如果沒有sync.WaitGroup,怎麼實現?
其實也不難,從程序開發角度看,就是維護一個小弟總數和一個通道。每個小弟幹完活,就往通道發一個空消息,
管理者阻塞在通道的監聽上。來一個消息就說明有一個小弟幹完活了,記錄下有多少個消息,消息個數和小弟總數一致。就說明全乾活了,管理者關閉通道,驗收小弟工作成果。
寫成代碼就是這樣子
workers := 3
ch := make(chan struct{})
worker := func() {
// 幹活幹活幹活
ch <- struct{}{} // 通知管理者
}
leader := func() {
cnt := 0
for range ch {
cnt++
if cnt == workers {
break
}
}
close(ch)
// 檢查工作成果
}
go leader()
for i := 0; i < workers; i++ {
go worker()
}
改成sync.Waitgroup實現同樣的功能就成這樣子
wg := sync.WaitGroup{}
workers := 3
wg.Add(workers)
worker := func() {
defer wg.Done()
// 幹活幹活幹活
}
leader := func() {
wg.Wait()
// 檢查工作成果
}
go leader()
for i := 0; i < workers; i++ {
go worker()
}
Add,Done,Wait。三招完事。
語義很清晰。
知識點:sync.WaitGroup可以解決同步阻塞等待的問題。一個人等待一堆人幹完活的問題得到優雅解決。
到此為止就是sync.WaitGroup的常規用法了。舉一反三,可能還想到其它用法?文章最後一部分揭曉
實現原理
根據語義猜測下,肯定是離不開阻塞喚醒機制和次數加減。而且是並發環境,那麼次數加減要CAS。最後還要記錄下阻塞的goroutine個數,因為要把挨個他們喚醒。
本文原理不多寫,簡單介紹下數據結構,再給出帶注釋的源碼,大家自行理解下。(如果看過《一份詳細注釋的go Mutex源碼》應該會很容易理解)
數據結構:
type WaitGroup struct {
noCopy noCopy
state1 [12]byte
sema uint32
}
如圖,除了state1其它沒什麼好說的。
state1是12位元組,但圖裡面只有8位元組。原因是32位編譯器的問題,在取state1時是做了特殊處理。
func (wg *WaitGroup) state() *uint64 {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)) // 32位系統
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[4])) // 64位系統
}
}
Add、Done和Wait注釋源碼
func (wg *WaitGroup) Add(delta int) {
statep := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) // 計數器
w := uint32(state) // 等待者個數。這裡用uint32,會直接截斷了高位32位,留下低32位
if v < 0 {
// Done的執行次數超出Add的數量
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
// 最開始時,Wait不能在Add之前被執行
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
// 計數器不為零,還有沒Done的。return
// 沒有等待者。return
return
}
// 所有goroutine都完成任務了,但有goroutine執行了Wait後被阻塞,需要喚醒它
if *statep != state {
// 已經到了喚醒階段了,就不能同時並發Add了
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 清零之後,就可以繼續Add和Done了
*statep = 0
for ; w != 0; w-- {
// 喚醒
runtime_Semrelease(&wg.sema, false)
}
}
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
func (wg *WaitGroup) Wait() {
statep := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 計數器
w := uint32(state) // 等待者個數
if v == 0 {
// 如果聲明變數後,直接執行Wait也不會有問題
// 下面CAS操作失敗,重試,但剛好發現計數器變成零了,安全退出
return
}
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
race.Write(unsafe.Pointer(&wg.sema))
}
// 掛起當前的g
runtime_Semacquire(&wg.sema)
// 被喚醒後,計數器不應該大於0
// 大於0意味著Add的數量被Done完後,又開始了新一波Add
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
舉一反三
前文說過常規用法是解決一個人等待一堆人幹完活的問題。
那反過來,一堆人等一個人幹完活呢?或者一堆人等另一堆人幹完活呢?
Add方法裏最後的for循環代碼告訴我們是可以的。
for ; w != 0; w-- {
// 喚醒全部被阻塞的goroutine
runtime_Semrelease(&wg.sema, false)
}
singleflight就是這樣的例子。它解決了一堆人等一個人幹完活的問題。就比如現在有100個線程同時請求資料庫中同一行數。但只能有一個線程能讀庫,其他線程都阻塞等待它的結果。
源碼也是短小精悍。其實仔細看,在高並發的情況下,singleflight的保證是分批式的。因為它會delete操作,只要delete操作搶鎖成功,後來者們就組成新的一批,而這一批保證只有一個goroutine被執行。
使用singlefilght也有要注意的地方,fn的錯誤重試要自己處理;fn的耗時會成為別的goroutine最低耗時。
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
// 一堆人都阻塞在這兒等一個人幹完活
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}
所以,一堆人等另一堆人幹完活問題的思路也很簡單。就不介紹啦。
有更溜的用法也歡迎留言。