GMP 最核心的几个概念可以在 go 源码的 /Users/lixuanshan/go/go1.21.8/src/runtime/runtime2.go 找到。
可以看到 g m p 在源码中就叫这个名字,下面我们一点点分析其中的逻辑。
1 g
1 2 3 4 5 6 7 8 9 10 11 12 13 14
type g struct { // ... m *m //在 p 的代理下,负责执行当前 g 的 m;执行过程中,这个 m 可能会指向不同的 m,动态调整 // ... sched gobuf //sched是啥呢,看下面的 struct // ... }
type gobuf struct { sp uintptr//保存 CPU 的 rsp 寄存器的值,指向函数调用栈栈顶; pc uintptr//保存 CPU 的 rip 寄存器的值,指向程序下一条执行指令的地址; ret uintptr//保存系统调用的返回值; bp uintptr//保存 CPU 的 rbp 寄存器的值,存储函数栈帧的起始位置. for framepointer-enabled architectures
// ... // 从其他 p 窃取一半 procs := uint32(gomaxprocs) if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) { if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) }
// 听刘丹冰将无数次的 work-stealing,就是这里了!stealWork具体逻辑后面会粘 gp, inheritTime, tnow, w, newWork := stealWork(now) now = tnow if gp != nil { // Successfully stole. // 成功偷到,☺️ return gp, inheritTime, false } if newWork { // There may be new timer or GC work; restart to // discover. goto top } if w != 0 && (pollUntil == 0 || w < pollUntil) { // Earlier timer to wait for. pollUntil = w } }
// 从全局取一个 g,同时他还偷偷干了一个事:会额外将一个 g 从全局队列转移到 p 的本地队列,让全局队列中的 g 也得到更充分的执行机会. funcglobrunqget(_p_ *p, max int32) *g { if sched.runqsize == 0 { returnnil }
n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { n = sched.runqsize } if max > 0 && n > max { n = max } if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 }
sched.runqsize -= n
// 全局队列弹出。这个 gp 是要返回给上层的。注意看,它下面还会继续弹出gp1,弹出 n 次! gp := sched.runq.pop() n-- for ; n > 0; n-- { gp1 := sched.runq.pop() // runqput 方法就是将一个 g 转移到 p 本地队列的执行,具体实现看下面 runqput(_p_, gp1, false) } return gp
// 将一个 g 转移到 p 本地队列的执行(从全局队列找 g 往里放会用,park 结束 ready 唤起也会用到) funcrunqput(_p_ *p, gp *g, next bool) { // ...
retry: h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) { // 倘若 p 的局部队列未满,则成功转移 g,将 p 的对尾索引 runqtail 值加 1 并解锁队列. _p_.runq[t%uint32(len(_p_.runq))].set(gp) atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption return } // 如果本地队列满了。还会给本地队列减负. 倘若发现本地队列 runq 已经满了,则会返回来,帮助当前 p 缓解执行压力,这部分内容位于 runqputslow 方法中.runqputslow 的内容请看下面 if runqputslow(_p_, gp, h, t) { return } // the queue is not full, now the put above must succeed goto retry
// 将本地队列中一半的 g 放回全局队列中 funcrunqputslow(_p_ *p, gp *g, h, t uint32)bool { var batch [len(_p_.runq)/2 + 1]*g // First, grab a batch from local queue. n := t - h n = n / 2 // ... for i := uint32(0); i < n; i++ { batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr() } if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume returnfalse } batch[n] = gp
// Link the goroutines. for i := uint32(0); i < n; i++ { batch[i].schedlink.set(batch[i+1]) } var q gQueue q.head.set(batch[0]) q.tail.set(batch[n])
// Now put the batch on global queue. lock(&sched.lock) globrunqputbatch(&q, int32(n+1)) unlock(&sched.lock) returntrue
// 从 p 本地队列中获取一个可执行的 goroutine funcrunqget(_p_ *p) (gp *g, inheritTime bool) { //倘若当前 p 的 runnext 非空,直接获取即可: if next != 0 && _p_.runnext.cas(next, 0) { return next.ptr(), true }
//加锁从 p 的本地队列中获取 g. //需要注意,虽然本地队列是属于 p 独有的,但是由于 work-stealing 机制的存在,其他 p 可能会前来执行窃取动作,因此操作仍需加锁.但是,由于窃取动作发生的频率不会太高,因此当前 p 取得锁的成功率是很高的,因此可以说p 的本地队列是接近于无锁化,但没有达到真正意义的无锁. for { // 这就是那个锁。一般不会互斥。 h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers t := _p_.runqtail if t == h { // 倘若本地队列为空,直接终止并返回; returnnil, false } // 倘若本地队列存在 g,则取得队首的 g,解锁并返回. gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume return gp, false } }
// . 窃取动作的核心逻辑 batch就是来偷东西的篮子(当前饥饿 p 的队列容器。我记得上面说过,p 队列里都有个 256 的数组! funcrunqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool)uint32 { for { // 每次对一个 p 尝试窃取前,会对其局部队列加锁;h 和 t 是俩锁,头和尾。记住这俩锁,最后偷完了要解锁的! h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer // 指针相减,得到 p 队列长度 n := t - h n = n - n/2 // 这里!窃取目标的一半! if n == 0 { if stealRunNextG { // Try to steal from _p_.runnext. if next := _p_.runnext; next != 0 { if _p_.status == _Prunning { if GOOS != "windows" && GOOS != "openbsd" && GOOS != "netbsd" { usleep(3) } else { osyield() } } if !_p_.runnext.cas(next, 0) { continue } batch[batchHead%uint32(len(batch))] = next return1 } } return0 } if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t continue } for i := uint32(0); i < n; i++ { g := _p_.runq[(h+i)%uint32(len(_p_.runq))] // 就偷你了,放到我的篮子里 batch[(batchHead+i)%uint32(len(batch))] = g } // 调整被偷玩的 head 指针。这里 h 放进去。应该是解锁了。 if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume return n //返回实际偷取的数量. } } }
6 execute
当 g0 为 m 寻找到可执行的 g 之后,接下来就开始执行 g. 这部分内容位于 runtime/proc.go 的 execute 方法中:
// 每隔一段时间就retake一次 funcretake(now int64)uint32 { n := 0 lock(&allpLock) 加锁后,遍历全局的 p 队列,寻找需要被抢占的目标 for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { // This can happen if procresize has grown // allp but not yet created new Ps. continue } pd := &_p_.sysmontick // ... if s == _Psyscall { // ... 倘若某个 p 同时满足下述条件(系统得找一个繁忙的时候,还不能老抢),则会进行抢占调度: I 执行系统调用超过 10 ms; II p 本地队列有等待执行的 g; III 或者当前没有空闲的 p 和 m. if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } unlock(&allpLock) // 抢占调度的步骤是,先将当前 p 的状态更新为 idle,然后步入 handoffp 方法中,判断是否需要为 p 寻找接管的 m(因为其原本绑定的 m 正在执行系统调用): if atomic.Cas(&_p_.status, s, _Pidle) { n++ _p_.syscalltick++ handoffp(_p_) } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) returnuint32(n) }
funchandoffp(_p_ *p) { if !runqempty(_p_) || sched.runqsize != 0 { I 当前 p 本地队列还有待执行的 g、全局不是空 startm(_p_, false) return }
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { II 全局繁忙(没有空闲的 p 和 m,全局 g 队列为空) startm(_p_, true) return } // 这一锁,你就知道是要调全局队列了! lock(&sched.lock) // ... if sched.runqsize != 0 { unlock(&sched.lock) // 全局不是空 startm(_p_, false) return } // If this is the last running P and nobody is polling network, // need to wakeup another M to poll network. if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { unlock(&sched.lock) III 需要处理网络 socket 读写请求 startm(_p_, false) return }
// ... // 保存当前 g 的执行环境; save(pc, sp) _g_.syscallsp = sp _g_.syscallpc = pc // 将 g 状态更新为 syscall; casgstatus(_g_, _Grunning, _Gsyscall) // ...
// 解除 p 和 当前 m 之间的绑定,因为 m 即将进入系统调用而导致短暂不可用; pp := _g_.m.p.ptr() pp.m = 0
// 将 p 添加到 当前 m 的 oldP 容器当中,后续 m 恢复后,会优先寻找旧的 p 重新建立绑定关系.这里是为了保证亲缘性。在之前介绍 p 的 next 指针式也有说过 _g_.m.oldp.set(pp) _g_.m.p = 0 // 将 p 的状态更新为 syscall; atomic.Store(&pp.status, _Psyscall) // ...
当 m 完成了内核态的系统调用之后,此时会步入位于 runtime/proc.go 的 exitsyscall 函数中,尝试寻找 p 重新开始运作:
funcexitsyscall0(gp *g) { // 将 g 由系统调用状态切换为可运行态, casgstatus(gp, _Gsyscall, _Grunnable) dropg()// 并解绑 g 和 m 的关系:
// 从全局 p 队列获取可用的 p, 注意,是获取 p,因为我们现在就是要给 g 找一个 p lock(&sched.lock) var _p_ *p if schedEnabled(gp) { _p_, _ = pidleget(0) } var locked bool if _p_ == nil { // 如若无 p 可用,则将 g 添加到全局队列,当前 m 陷入沉睡. 直到被唤醒后才会继续发起调度. globrunqput(gp) } unlock(&sched.lock) // 解锁,我们就知道,从全局 p 队列获取的方法完事了
// 如果获取到了,则执行 g if _p_ != nil { acquirep(_p_) execute(gp, false) // Never returns. } // ... stopm() schedule() // Never returns. }