Skip to content

golang WaitGroup 实现原理

Posted on:October 31, 2023 at 06:25 PM

WaitGroup


基本流程

一旦有Wait()被唤醒触发,则不能再Add

Add

源码解析 ==>

func (wg *WaitGroup) Add (delta int) {
  // 首先获取状态值
  statep, semap := wg.state ()
  // 对于 statep 中 counter + delta
  state := atomic.AddUint64 (statep, uint64 (delta)<<32)
  // 获取任务计数器的值
  v := int32 (state >> 32)
  // 获取等待者计数器的值
  w := uint32 (state)

  // 任务计数器不能为负数
  if v < 0 {
      panic ("sync: negative WaitGroup counter")
  }
  // 已经有人在等待,但是还在添加任务
  if w != 0 && delta > 0 && v == int32 (delta) {
      panic ("sync: WaitGroup misuse: Add called concurrently with Wait")
  }
  // 没有等待者或者任务还有没做完的
  if v > 0 || w == 0 {
      return
  }
  // 有等待者,但是在这个过程中数据还在变动
  if *statep != state {
      panic ("sync: WaitGroup misuse: Add called concurrently with Wait")
  }

  // Reset waiters count to 0.
  // 重置状态,并用发出等同于等待者数量的信号量,告诉所有等待者任务已经完成
  *statep = 0
  for ; w != 0; w-- {
      runtime_Semrelease (semap, false, 0)
  }
}

Wait

源码解析 ==>

func (wg *WaitGroup) Wait () {
  // 先获取状态
  statep, semap := wg.state ()

  for {
      // 这里注意要用 atomic 的 Load 来保证一下写操作已经完成
      state := atomic.LoadUint64 (statep)
      // 同样的,这里是任务计数
      v := int32 (state >> 32)
      // 这里是等待者计数
      w := uint32 (state)
      // 如果没有任务,那么直接结束,不用等待了
      if v == 0 {
          return
      }
      // 使用 cas 操作,如果不相等,证明中间已经被其他人修改了状态,重新走 for 循环
      // 注意这里 if 进去之后等待者的数量就 +1 了
      if atomic.CompareAndSwapUint64 (statep, state, state+1) {
          // 等待信号量
          runtime_Semacquire (semap)
          // 如果信号量来了,但是状态还不是 0,则证明 wait 之后还是在人在 add,证明有人想充分利用 wg 但是时机不对
          if *statep != 0 {
              panic ("sync: WaitGroup is reused before previous Wait has returned")
          }
          return
      }
  }
}

Done

源码解析 ==>

func (wg *WaitGroup) Done () {
    wg.Add (-1)
}

可以就看出来,可以传个大于1的数,一次结束多个任务。