Golang最重要的功能之一便是select,select一个重要作用就是解决 竞态数据 问题,当多个goroutine 对同一数据进行读写时,有可能读到非一致性数据,select 可以将并发读写顺序化,避免数据不一致。当然使用同步原语(Mutex、RWMutex等)或者原子操作(sync/atomic)也可以解决 竞态数据 问题。select 的实现在go的源码包runtime中,路径为:./src/runtime/select.go。本文尝试简单的进行源码的阅读与解析。
不同版本select实现有所不同,19版本后,select进行了比较大的重构,本文基于golang1.21.1源码进行分析
首先,看下select是如何使用的
select {
case c1 <-1:
// TODO somethings
case <-c2:
// TODO somethings
default:
// TODO somethings
}
上述代码做了下面几件事:
创建select –> 注册case –> 执行select –> 释放select
select主要是用到的函数
runtime.reflect_rselect()
runtime.selectgo() //选择case并执行
runtime.selectsetpc()//设置goroutine的PC值和SP值,以此切换到选择的case执行体中
runtime.sellock() //将case的chaneel加锁
runtime.selunlock()//将case的channel解锁
其中涉及到几个常量
// These values must match ../reflect/value.go:/SelectDir.
type selectDir int
const (
// scase.kind
_ selectDir = iota // 0 :表示case 为nil;在send 或者 recv 发生在一个 nil channel 上,就有可能出现这种情况
selectRecv // 1 : 表示case 为接收通道 <- ch
selectSend // 2 :表示case 为发送通道 ch <-
selectDefault // 3 :表示 default 语句块
)
此外涉及到几个重要的结构体:
/**
定义select 结构
*/
// A runtimeSelect is a single case passed to rselect.
// This must match ../reflect/value.go:/runtimeSelect
type runtimeSelect struct {
dir selectDir
typ unsafe.Pointer // channel type (not used here)
ch *hchan // channel
val unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir)
}
/**
select 中每一个case的定义
*/
// Select case descriptor.
// Known to compiler.
// Changes here must also be made in src/cmd/compile/internal/walk/select.go's scasetype.
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}
scase 结构两个字段表示一个通道,以及发送/接收的值
此外,如果结合旧版本源码可以看出,新版本runtimeSelect 相比 旧版本hselect,优化了 cases 数组,改从PC直接拿数据地址和指令地址,让调用更加贴近底层,改由指针直接指向数据栈,效率和速度更快。
继续往下看
//这里的返回值int表示可执行的case编号,boll,如果是读操作,则表示是否成功从cahnnel中读到了数据
//go:linkname reflect_rselect reflect.rselect
func reflect_rselect(cases []runtimeSelect) ( int, bool) {
if len(cases) == 0 {
//这里实际上是直接调用了gopark函数,在此处,
//gopark函数入参 gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1)
// 表示当前协程会被永远挂起,然后golang 死锁检测会检查该协程是否永远不会被唤醒,是则会panic
//这里再往下走,就是mcall汇编调佣了,回头再研究研究
block()
}
sel := make([]scase, len(cases))
orig := make([]int, len(cases))
nsends, nrecvs := 0, 0
dflt := -1
//这里主要是将select中传入的cases转化成scase 数组
for i, rc := range cases {
var j int
//根据类型对send 和recv case进行计数,并且对于sel数组的存储做了很巧妙的构思,
//j的值处理可以保证sel数组前面部分是send的case,后面部分是recv的case
switch rc.dir {
case selectDefault:
dflt = i
continue
case selectSend:
j = nsends
nsends++
case selectRecv:
nrecvs++
j = len(cases) - nrecvs
}
//如果是send,name从sel数组前面开始存入,如果是recv,则从sel最后往前存
//sel数组中send和recv就分类存储了,当然,由于default情况,这样存储,
//可能会出现sel中间空出一段的情况
sel[j] = scase{c: rc.ch, elem: rc.val}
orig[j] = i
}
//nsends+nrecvs == 0,说明只有default case,则直接选择default
// Only a default case.
if nsends+nrecvs == 0 {
return dflt, false
}
//此处就是为了处理sel中间空出一段的情况
// Compact sel and orig if necessary.
if nsends+nrecvs < len(cases) {
copy(sel[nsends:], sel[len(cases)-nrecvs:])
copy(orig[nsends:], orig[len(cases)-nrecvs:])
}
order := make([]uint16, 2*(nsends+nrecvs))
var pc0 *uintptr
//数据竞态检测是否开启,这个由 -race编译选项初始化
if raceenabled {
pcs := make([]uintptr, nsends+nrecvs)
//循环将当前PC寄存器值(当前调用方要执行的下一个指令地址)存起来
for i := range pcs {
selectsetpc(&pcs[i])
}
pc0 = &pcs[0]
}
//selectgo将选出下一步执行的case
chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1)
//根据selectgo返回值选择case
// Translate chosen back to caller's ordering.
if chosen < 0 {
chosen = dflt
} else {
chosen = orig[chosen]
}
return chosen, recvOK
}
可以看得出来,其实该函数的真正外部触发是在 reflect包中的rselct() 函数【具体为什么请自行研究下 //go:linkname 的使用】。而当前这个函数呢其实是主要作用就是从 select 控制结构中的多个 case 中选择一个需要执行的 case。
接下来可以看看selectgo是如何选择 case 的:
selectgo:
// selectgo implements the select statement.
//
// cas0 points to an array of type [ncases]scase, and order0 points to
// an array of type [2*ncases]uint16 where ncases must be <= 65536.
// Both reside on the goroutine's stack (regardless of any escaping in
// selectgo).
//
// For race detector builds, pc0 points to an array of type
// [ncases]uintptr (also on the stack); for other builds, it's set to
// nil.
//
// selectgo returns the index of the chosen scase, which matches the
// ordinal position of its respective select{recv,send,default} call.
// Also, if the chosen scase was a receive operation, it reports whether
// a value was received.
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
if debugSelect {
print("select: cas0=", cas0, "\n")
}
// NOTE: In order to maintain a lean stack size, the number of scases
// is capped at 65536.
//1 << 16 = 65536 ,scase数组最大cap
// 1 << 17 = 131072 ,order1为两倍scase长度的buffer,用来存储pollorder和lockorder
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
//每次pollorder在selectgo执行时都会被打乱,已达到随机检测case的目的
//lockorder 存储
pollorder := order1[:ncases:ncases]
//将ncase后面的case加锁,防止相同操作的case被唤醒
lockorder := order1[ncases:][:ncases:ncases]
// NOTE: pollorder/lockorder's underlying array was not zero-initialized by compiler.
// Even when raceenabled is true, there might be select
// statements in packages compiled without -race (e.g.,
// ensureSigM in runtime/signal_unix.go).
//竞态检测
var pcs []uintptr
if raceenabled && pc0 != nil {
pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
//pcs 内容为pc1前面ncases个内容,长度也是ncases
pcs = pc1[:ncases:ncases]
}
casePC := func(casi int) uintptr {
if pcs == nil {
return 0
}
return pcs[casi]
}
//性能检测开关,启动及时
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// The compiler rewrites selects that statically have
// only 0 or 1 cases plus default into simpler constructs.
// The only way we can end up with such small sel.ncase
// values here is for a larger select in which most channels
// have been nilled out. The general code handles those
// cases correctly, and they are rare enough not to bother
// optimizing (and needing to test).
// generate permuted order
//将scases顺序打乱
norder := 0
for i := range scases {
cas := &scases[i]
// Omit cases without channels from the poll and lock orders.
//忽略轮询和锁定命令中没有通道的情况
if cas.c == nil {
cas.elem = nil // allow GC
continue
}
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]
// sort the cases by Hchan address to get the locking order.
// simple heap sort, to guarantee n log n time and constant stack footprint.
//pollorder中的case复制到lockorder中
for i := range lockorder {
j := i
// Start with the pollorder to permute cases on the same channel.
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
//将lockorder中的case按照 channel 再堆上地址进行排序
for i := len(lockorder) - 1; i >= 0; i-- {
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {
break
}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
k++
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}
if debugSelect {
for i := 0; i+1 < len(lockorder); i++ {
if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
throw("select: broken sort")
}
}
}
// lock all the channels involved in the select
//将所有case的channel都加锁
sellock(scases, lockorder)
//以上算是准备工作,接下来就是对case的选择与执行
var (
gp *g
sg *sudog
c *hchan
k *scase
sglist *sudog
sgnext *sudog
qp unsafe.Pointer
nextp **sudog
)
// pass 1 - look for something already waiting
var casi int
var cas *scase
var caseSuccess bool
var caseReleaseTime int64 = -1
var recvOK bool
//遍历pollorder,对准备就绪的channel进行接收、发送操作,由于pollorder已经是随机乱序了的,所以顺序遍历对于case0来说就是随机的了
for _, casei := range pollorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
//casei与sends进行比较,判断是接收还是发送,因为之前的准备工作中将发送和接收分段存储了
//casi >= nsends,即表示接收操作
if casi >= nsends {
//看看是否有待发送数据
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
//缓冲区有数据
if c.qcount > 0 {
goto bufrecv
}
//通道关闭
if c.closed != 0 {
goto rclose
}
} else {//发送数据
if raceenabled {
racereadpc(c.raceaddr(), casePC(casi), chansendpc)
}
//通道关闭
if c.closed != 0 {
goto sclose
}
//接受等待队列是否有goroutine,有的话就把数据发过去
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
//缓冲区有空位置
if c.qcount < c.dataqsiz {
goto bufsend
}
}
}
//如果不阻塞,表示有default,则解锁所有channel,准备退出select
if !block {
selunlock(scases, lockorder)
casi = -1
goto retc
}
//阻塞、加入等待队列
// pass 2 - enqueue on all chans
gp = getg()
if gp.waiting != nil {
throw("gp.waiting != nil")
}
//下面会给根据每个case创建一个sudog,加入到当前g的waiting列表
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
// 创建一个sudog,管理当前的g
// 这块的处理和chan阻塞的处理方式一样,也是要保存自己当前的g
// 目的都是后面需要将当前g挂起
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
//注意,这里等待唤醒的数据 <sudog>,绑定case下的channel
sg.c = c
// Construct waiting list in lock order.
// sudog结构体都会被串成链表附着在当前 goroutine 上
*nextp = sg
nextp = &sg.waitlink
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}
// 调用 gopark 函数挂起当前的 goroutine 等待调度器的唤醒
// 我们创建的sudog加入到各个chan中,如果有一个sudog被唤醒,将会执行
// gopark后面的逻辑
// wait for someone to wake us up
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = false
sellock(scases, lockorder)
// 这儿的gp的param应该是gopark之后被唤醒,导致被唤醒的那个sudog
gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// 被唤醒后,清除我们之前生成的sudog的一些数据
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
for _, casei := range lockorder {
k = &scases[casei]
// 找到被唤醒的sudog
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
//获取case里的chan
casi = int(casei)
cas = k
caseSuccess = sglist.success
if sglist.releasetime > 0 {
caseReleaseTime = sglist.releasetime
}
// 不是唤醒的sudog,我们也需要将它从其他case的chan里面的队列拿出来
// 因为被唤醒的sudog已经被它对应的chan执行出队的操作,不再需要
} else {
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
// 持续寻找被唤醒的那个sudog,类似于一个迭代器
sgnext = sglist.waitlink
sglist.waitlink = nil
// 我们已经不再需要之前创建的sudog了,清理掉
releaseSudog(sglist)
sglist = sgnext
}
if cas == nil {
throw("selectgo: bad wakeup")
}
// 就是这个case的chan有数据被唤醒的,我们可以操作这个chan了
c = cas.c
if debugSelect {
print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
}
// 这儿的意思是,如果我们的sudog被唤醒的原因是因为chan被关闭了而被唤醒的
// 那么就走sclose逻辑,sclose里面会panic。因为不能往一个被关闭的chan
// 里面send数据,但是可以往关闭的chan里面读取数据
if casi < nsends {
if !caseSuccess {
goto sclose
}
} else {
recvOK = caseSuccess
}
if raceenabled {
if casi < nsends {
raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
} else if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
}
}
if msanenabled {
if casi < nsends {
msanread(cas.elem, c.elemtype.Size_)
} else if cas.elem != nil {
msanwrite(cas.elem, c.elemtype.Size_)
}
}
if asanenabled {
if casi < nsends {
asanread(cas.elem, c.elemtype.Size_)
} else if cas.elem != nil {
asanwrite(cas.elem, c.elemtype.Size_)
}
}
//解锁所有case
selunlock(scases, lockorder)
goto retc
//从buffer接受数据
bufrecv:
// can receive from buffer
if raceenabled {
if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
}
racenotify(c, c.recvx, nil)
}
if msanenabled && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.Size_)
}
if asanenabled && cas.elem != nil {
asanwrite(cas.elem, c.elemtype.Size_)
}
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc
//将数据发送到buffer
bufsend:
// can send to buffer
if raceenabled {
racenotify(c, c.sendx, nil)
raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.Size_)
}
if asanenabled {
asanread(cas.elem, c.elemtype.Size_)
}
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc
//从休眠的sg直接接收数据
recv:
// can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncrecv: cas0=", cas0, " c=", c, "\n")
}
recvOK = true
goto retc
//从已经关闭的channel末尾读取数据
rclose:
// read at end of closed channel
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
if raceenabled {
raceacquire(c.raceaddr())
}
goto retc
//向一个休眠的sg发送数据
send:
// can send to a sleeping receiver (sg)
if raceenabled {
raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.Size_)
}
if asanenabled {
asanread(cas.elem, c.elemtype.Size_)
}
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncsend: cas0=", cas0, " c=", c, "\n")
}
goto retc
retc:
if caseReleaseTime > 0 {
blockevent(caseReleaseTime-t0, 1)
}
return casi, recvOK
//向已经关闭的channel发送数据
sclose:
// send on closed channel
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
}
所以,selectgo主要做4件事:
- 将case乱序后,加入lockerorder中。
- 第一次循环执行 pollorder 中已经乱序了的 case — 对就绪的channel进行 发送/接收 。
- 第二次循环执行 lockorder,将当前 goroutine 加入到 所有 case 的 channel 发送/接收队列中( sendq/recvq), 等待被唤醒。
- goroutine 被唤醒之后,找到满足条件的 channel并处理。
轮询
channel 的轮询顺序是通过 fastrandn函数 随机生成的,这其实就导致了如果多个 channel 同时响应,select 会随机选择其中的一个执行;而另一个 lockOrder 就是根据 channel 的地址确定的,根据相同的顺序锁定 channel 能够避免死锁的发生,最后调用的 sellock 就会按照之前生成的顺序锁定所有的 channel。
不同情况 case 的判断与处理办法
读取时 当前case会从channel接收数据
如果当前channel的sendq上有goroutine在等待数据的接收,则goto recv 直接从这个goroutine中读出数据,结束select
如果当前channel的缓冲区不为空,即有数据,则goto bufrecv 去缓冲区读取数据,然后结束select
如果当前channel关闭了,则goto rclose ,读出零值,结束select
接收时 当前case会向channel发送数据
如果当前 channel 已经关闭,则goto rclose,结束select
如果当前 channel 的recvq上有goroutine在等待发送数据,则goto send,向channel直接发送数据
如果当前 channel 的缓冲区还有空间可以接受数据,则goto bufsend ,往缓冲区中发送数据
默认条件时 则直接解锁所有的channel并退出selectgo,由此也可以看出 当前select结构中的其他收发语句都是非阻塞的
sortKey
综合来讲,select在执行时,会做比较多的准备工作,首先是编译期间的优化,对于空select进行block,直接挂起当前goroutine,如果只有一个default case,则会直接返回default case 然后 运行selectgo函数,这里会随机生成一个用于遍历的轮询顺序pollorder,并且根据channel在堆上地址生成一个用于遍历的顺序lockorder,然后根据pollorder遍历所有cases检查是否有就绪的channel,如果有则直接获取对应case索引并返回,如果没有则创建sudog结构,将当前goroutine加入到所有相关的channel的sendq和recvq的队列中,然后在调用gopark出发调度器的调度
注意 并不是所有的select都会走到selectgo,很多情况都会被直接优化掉