WaitGroup
基本流程
- waitgroup 维护两个计数器
- v 请求计数器
- w 等待计数器
- 两个计数器共同组成一个64bit,请求计数器占用高32位,等待计数器占用低32位
- 请求计数器v:Add执行 v 加1,Done执行 v 减1,当v为0时,通过信号量唤醒Wait()
- 等待计数器w:同一实例有多个wait时,Wait()执行一次w加1
一旦有Wait()被唤醒触发,则不能再Add
Add
源码解析 ==>
func (wg *WaitGroup) Add (delta int) {
// 首先获取状态值
statep, semap := wg.state ()
// 对于 statep 中 counter + delta
state := atomic.AddUint64 (statep, uint64 (delta)<<32)
// 获取任务计数器的值
v := int32 (state >> 32)
// 获取等待者计数器的值
w := uint32 (state)
// 任务计数器不能为负数
if v < 0 {
panic ("sync: negative WaitGroup counter")
}
// 已经有人在等待,但是还在添加任务
if w != 0 && delta > 0 && v == int32 (delta) {
panic ("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 没有等待者或者任务还有没做完的
if v > 0 || w == 0 {
return
}
// 有等待者,但是在这个过程中数据还在变动
if *statep != state {
panic ("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
// 重置状态,并用发出等同于等待者数量的信号量,告诉所有等待者任务已经完成
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease (semap, false, 0)
}
}
- Wait 的过程中是不能 Add 的,不然就会 panic,要注意
- 虽然我们可以借助 Add 一个负数来一次性结束多个任务,但是如果任务数量控制的不好,变成负数也会 panic,Done 次数多了也一样
- wg 是通过信号量来通知的,当然可以有很多人在等,wg 它都会一一通知到位的
Wait
源码解析 ==>
func (wg *WaitGroup) Wait () {
// 先获取状态
statep, semap := wg.state ()
for {
// 这里注意要用 atomic 的 Load 来保证一下写操作已经完成
state := atomic.LoadUint64 (statep)
// 同样的,这里是任务计数
v := int32 (state >> 32)
// 这里是等待者计数
w := uint32 (state)
// 如果没有任务,那么直接结束,不用等待了
if v == 0 {
return
}
// 使用 cas 操作,如果不相等,证明中间已经被其他人修改了状态,重新走 for 循环
// 注意这里 if 进去之后等待者的数量就 +1 了
if atomic.CompareAndSwapUint64 (statep, state, state+1) {
// 等待信号量
runtime_Semacquire (semap)
// 如果信号量来了,但是状态还不是 0,则证明 wait 之后还是在人在 add,证明有人想充分利用 wg 但是时机不对
if *statep != 0 {
panic ("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
- 通过 load 和 cas 操作 + 循环来避免了锁,其实这个操作可以学一下
- 从这里也看得出来,wg 可以重用,但是必须等到 wait 全部完成之后再说
Done
源码解析 ==>
func (wg *WaitGroup) Done () {
wg.Add (-1)
}
可以就看出来,可以传个大于1的数,一次结束多个任务。