1、waitgroup是什么?有什么作用?

用来阻塞主协程,可以等待所有协程执行完。

2、使用方法

总共三个方法Add(n)【n为总共要等待的协程数】,Done【在协程中调,相当于Add(-1)】Wait【等待阻塞结束】

func main() {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
time.Sleep(time.Second)
fmt.Println("rountine 1")
}()
go func() {
defer wg.Done()
time.Sleep(time.Second)
fmt.Println("rountine 2")
}()
wg.Wait()
fmt.Println("total")
}

3、实现原理

  • waitgroup结构体中state1的格式很重要。共占12个位元组。因为64位原子操作需要64位对齐,但32位编译器无法确保它。所以分配12个位元组,然后使用其中的对齐8个位元组作为状态数,另外4个作为存储sema。

  • 调用Add时,操作高位的计数器值进行加减;

Add方法源码剖析:
state := atomic.AddUint64(statep, uint64(delta)<<32)// statep为状态位的值,delta为要add的值,将其左移32位,后相加到高32位,完成计数器加减。
v := int32(state >> 32) // 拿到当前计数器的值。state状态值【包含高32位的计数器值和低32位的等待数量值】右移32位得到计数器真正的数
w := uint32(state) // 等待者数量的值 取的低32位

…… // 如果计数值大于0,直接return;

// 如果计数值=0,开始释放wait信号
for ; w != 0; w-- {
runtime_Semrelease(semap, false)
}

  • 调用wait时,低32位的wait数会累加,如果获取到释放的信号后且计数器是0,则调用原语递减信号量的值 结束阻塞。

摘自wait方法源码:
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {//原子加
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}

推荐阅读:

相关文章