核心数据结构

GMP 最核心的几个概念可以在 go 源码的 /Users/lixuanshan/go/go1.21.8/src/runtime/runtime2.go 找到。

可以看到 g m p 在源码中就叫这个名字,下面我们一点点分析其中的逻辑。

1 g

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type g struct {
// ...
m *m //在 p 的代理下,负责执行当前 g 的 m;执行过程中,这个 m 可能会指向不同的 m,动态调整
// ...
sched gobuf //sched是啥呢,看下面的 struct
// ...
}


type gobuf struct {
sp uintptr //保存 CPU 的 rsp 寄存器的值,指向函数调用栈栈顶;
pc uintptr //保存 CPU 的 rip 寄存器的值,指向程序下一条执行指令的地址;
ret uintptr //保存系统调用的返回值;
bp uintptr //保存 CPU 的 rbp 寄存器的值,存储函数栈帧的起始位置. for framepointer-enabled architectures

其中 g 的生命周期由以下几种状态组成:

1
2
3
4
5
6
7
8
9
10
const(
_Gidle = itoa // 0 协程开始创建时的状态,此时尚未初始化完成;
_Grunnable // 1 协程在待执行队列中,等待被执行;
_Grunning // 2 协程正在执行,同一时刻一个 p 中只有一个 g 处于此状态;
_Gsyscall // 3 协程正在执行系统调用;内核态的阻塞。比如需要输入。
_Gwaiting // 4 协程处于挂起态,需要等待被唤醒. gc、channel 通信或者锁操作时经常会进入这种状态;
_Gdead // 6 协程刚初始化完成或者已经被销毁,会处于此状态;
_Gcopystack // 8 协程正在栈扩容流程中;
_Gpreempted // 9 协程被抢占后的状态.
)

2 m

1
2
3
4
5
6
type m struct {
g0 *g // goroutine with scheduling stack 一类特殊的调度协程,不用于执行用户函数,负责执行 g 之间的切换调度. 与 m 的关系为 1:1;
// ...
tls [tlsSlots]uintptr // thread-local storage (for x86 extern register) thread-local storage,线程本地存储,存储内容只对当前线程可见. 线程本地存储的是 m.tls 的地址,m.tls[0] 存储的是当前运行的 g,因此线程可以通过 g 找到当前的 m、p、g0 等信息.
// ...
}

3 p

1
2
3
4
5
6
7
8
9
type p struct {
// ...
runqhead uint32
runqtail uint32
runq [256]guintptr //本地 goroutine 队列,最大长度为 256.记住这个 256,一会儿会考

runnext guintptr //下一个可执行的 goroutine.
// ...
}

runnext 的一个用处:
为了保证亲缘性。一个 gopark 阻塞的g,如果变成 runable后,想要快速被它原来绑定的p 运行,就使用过runnext来进行指向。(否则没有这个 runnext,g 唤醒后只能被放到 p 的本地队列队尾,这时候可能很久都排不到,最后被 steal,从而丧失亲缘性)

schedt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type schedt struct {
// ...
lock mutex //一把操作全局队列时使用的锁;

midle muintptr // idle m's waiting for work 空闲的 m
pidle puintptr // idle p's 空闲的 p

// ...
runq gQueue //全局 goroutine 队列;
runqsize int32 //全局 goroutine 队列的容量.
// ...


}

调度流程

1 两种 g 的转换

goroutine 的类型可分为两类:

  • I 负责调度普通 g 的 g0,执行固定的调度流程,与 m 的关系为一对一;

  • II 负责执行用户函数的普通 g.

m 通过 p 调度执行的 goroutine 永远在普通 g 和 g0 之间进行切换, runtime/stubs.go 文件中:

1
2
3
4
// gogo 和 mcall 可以理解为对偶关系
func gogo(buf *gobuf) // 当 g0 找到可执行的 g 时,会调用 gogo 方法,调度 g 执行用户定义的任务;
// ...
func mcall(fn func(*g)) // 当 g 需要主动让渡或被动调度时,会触发 mcall 方法,将执行权重新交还给 g0.

为啥要让 g0 负责调度呢?g0 的栈空间很大。是直接分配在线程栈的。

而 goroutine 的栈空间很小。go 里很多源码方法都标记了 go:no-split,就是不支持栈增长(编译器不会在执行这个函数的过程中检测栈空间)。这时候在 g0 执行可以防止栈溢出。

我们所探讨的这个 GMP 调度,更多的是聊的在 g0 这个视角下发生的故事。

2 调度类型

通常,调度指的是由 g0 按照特定策略找到下一个可执行 g 的过程. 而本小节谈及的调度类型是广义上的“调度”,指的是调度器 p 实现从执行一个 g 切换到另一个 g 的过程.

这种广义“调度”可分为几种类型:

(1)主动调度

一种用户主动执行让渡的方式,主要方式是,用户在执行代码中调用了 runtime.Gosched 方法,此时当前 g 会当让出p执行权,主动进行队列等待下次被调度执行.

代码位于 runtime/proc.go

1
2
3
4
5
func Gosched() {
checkTimeouts()
mcall(gosched_m)
}
运行后,g 会从 runing 变成 runable,然后投递到全局队列

(2)被动调度

因当前不满足某种执行条件,g 可能会陷入阻塞态无法被调度,直到关注的条件达成后,g才从阻塞中被唤醒,重新进入可执行队列等待被调度.

常见的被动调度触发方式为因 channel 操作或互斥锁操作陷入阻塞等操作,底层会走进 gopark 方法.

代码位于 runtime/proc.go

1
2
3
4
5
6
7
8
9
10
11
12
func gopark(unlockf func(*g, unsafe.Poi nter) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
// ...
mcall(park_m)
} // mcall 唤起go,执行完 g 变成 waiting, 同时 g 与 m 解绑

func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}

//goready 方法通常与 gopark 方法成对出现,能够将 g 从阻塞态中恢复,重新进入等待执行的状态.

(3)正常调度:

g 中的执行任务已完成,g0 会将当前 g 置为死亡状态,发起新一轮调度.

(4)抢占调度:

值得一提的是,前 3 种调度方式都由 m 下的 g0 完成,唯独抢占调度不同.

由一个存在的全局监控 g 完成

倘若 g 执行系统调用(系统调用是内核态的。这是最可怕的)超过指定的时长,且全局的 p 资源比较紧缺,此时将 p 和 g 解绑,抢占出来用于其他 g 的调度. 等 g 完成系统调用后,会重新进入可执行队列中等待被调度.

注意,由于这个阻塞发生在内核态。所以这时的 m 也是无法自我完成跳出,所以才必须由全局监控 g 来打破

这个抢占,是 p 与 [m-g] 解绑, p 就去找其他 m 了

因为发起系统调用时需要打破用户态的边界进入内核态,此时 m 也会因系统调用而陷入僵直,无法主动完成抢占调度的行为.

因此,在 Golang 进程会有一个全局监控协程 monitor g 的存在,这个 g 会越过 p 直接与一个 m 进行绑定,不断轮询对所有 p 的执行状况进行监控. 倘若发现满足抢占调度的条件,则会从第三方的角度出手干预,主动发起该动作.

3 宏观调度流程

集齐各部分理论碎片之后,我们可以尝试对 gmp 的宏观调度流程进行整体串联:

(1)以 g0 -> g -> g0 的一轮循环为例进行串联;

(2)g0 执行 schedule() 函数,寻找到用于执行的 g;

(3)g0 执行 execute() 方法,更新当前 g、p 的状态信息,并调用 gogo() 方法,将执行权交给 g;

(4)g 因主动让渡( gosche_m() )、被动调度( park_m() )、正常结束( goexit0() )等原因,调用 m_call 函数,执行权重新回到 g0 手中;

(5)g0 执行 schedule() 函数,开启新一轮循环.

4 schedule

调度流程的主干方法是位于 runtime/proc.go 中的 schedule 函数,此时的执行权位于 g0 手中:

1
2
3
4
5
6
7
8
9
10
// g0 开始工作:
func schedule() {
// ... 取出 gp,就是要执行的 g,怎么找,请看4.5
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available


// ...
// 交接给 gp(用户的 g)
execute(gp, inheritTime)
}

(1)寻找到下一个执行的 goroutine;

(2)执行该 goroutine.

5 findRunnable

调度流程中,一个非常核心的步骤,就是为 m 寻找到下一个执行的 g,这部分内容位于 runtime/proc.go 的 findRunnable 方法中:

I 取得 p 本地队列队首的索引,同时对本地队列加锁:

1
h := atomic.LoadAcq(&_p_.runqhead)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
_g_ := getg()


top:
_p_ := _g_.m.p.ptr()
// ...
// p 的总调度次数 存在于 schedtick, 记住这个schedtick,gogo 之前它就会++,下面有呼应
// 每 61 次调度后,会从全局队列中取一次。避免本地 p 过于繁忙,而全局 p 队列一直无法唤起,这写死的 61 应该是经验之谈
if _p_.schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
// globrunqget 就是从全局取。具体代码可以看下面。
gp = globrunqget(_p_, 1)
unlock(&sched.lock)
if gp != nil {
// 取到就直接返回了
return gp, false, false
}
}

// ...
// 这里就是从本地 p 队列获取. runqget的逻辑下面会粘
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime, false
}

// ...
// 本地没拿到,继续从全局找
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}



// 本地也没有、全局也没有:寻找I/O事件已经就绪的、待唤起的g
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
// 需要注意的是,刚获取网络协程时,g 的状态是处于 waiting 的,因此需要先更新为 runnable 状态.
casgstatus(gp, _Gwaiting, _Grunnable)
return gp, false, false
}
}


// ...
// 从其他 p 窃取一半
procs := uint32(gomaxprocs)
if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}

// 听刘丹冰将无数次的 work-stealing,就是这里了!stealWork具体逻辑后面会粘
gp, inheritTime, tnow, w, newWork := stealWork(now)
now = tnow
if gp != nil {
// Successfully stole.
// 成功偷到,☺️
return gp, inheritTime, false
}
if newWork {
// There may be new timer or GC work; restart to
// discover.
goto top
}
if w != 0 && (pollUntil == 0 || w < pollUntil) {
// Earlier timer to wait for.
pollUntil = w
}
}


//
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 从全局取一个 g,同时他还偷偷干了一个事:会额外将一个 g 从全局队列转移到 p 的本地队列,让全局队列中的 g 也得到更充分的执行机会.
func globrunqget(_p_ *p, max int32) *g {
if sched.runqsize == 0 {
return nil
}


n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max {
n = max
}
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}


sched.runqsize -= n

// 全局队列弹出。这个 gp 是要返回给上层的。注意看,它下面还会继续弹出gp1,弹出 n 次!
gp := sched.runq.pop()
n--
for ; n > 0; n-- {
gp1 := sched.runq.pop()
// runqput 方法就是将一个 g 转移到 p 本地队列的执行,具体实现看下面
runqput(_p_, gp1, false)
}
return gp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 将一个 g 转移到 p 本地队列的执行(从全局队列找 g 往里放会用,park 结束 ready 唤起也会用到)
func runqput(_p_ *p, gp *g, next bool) {
// ...

retry:
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail

if t-h < uint32(len(_p_.runq)) {
// 倘若 p 的局部队列未满,则成功转移 g,将 p 的对尾索引 runqtail 值加 1 并解锁队列.
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
// 如果本地队列满了。还会给本地队列减负. 倘若发现本地队列 runq 已经满了,则会返回来,帮助当前 p 缓解执行压力,这部分内容位于 runqputslow 方法中.runqputslow 的内容请看下面
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 将本地队列中一半的 g 放回全局队列中
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
var batch [len(_p_.runq)/2 + 1]*g
// First, grab a batch from local queue.
n := t - h
n = n / 2

// ...
for i := uint32(0); i < n; i++ {
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
}
if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return false
}

batch[n] = gp


// Link the goroutines.
for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1])
}
var q gQueue
q.head.set(batch[0])
q.tail.set(batch[n])


// Now put the batch on global queue.
lock(&sched.lock)
globrunqputbatch(&q, int32(n+1))
unlock(&sched.lock)
return true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 从 p 本地队列中获取一个可执行的 goroutine
func runqget(_p_ *p) (gp *g, inheritTime bool) {
//倘若当前 p 的 runnext 非空,直接获取即可:
if next != 0 && _p_.runnext.cas(next, 0) {
return next.ptr(), true
}


//加锁从 p 的本地队列中获取 g.
//需要注意,虽然本地队列是属于 p 独有的,但是由于 work-stealing 机制的存在,其他 p 可能会前来执行窃取动作,因此操作仍需加锁.但是,由于窃取动作发生的频率不会太高,因此当前 p 取得锁的成功率是很高的,因此可以说p 的本地队列是接近于无锁化,但没有达到真正意义的无锁.
for {
// 这就是那个锁。一般不会互斥。
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
// 倘若本地队列为空,直接终止并返回;
return nil, false
}
// 倘若本地队列存在 g,则取得队首的 g,解锁并返回.
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 偷!
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
pp := getg().m.p.ptr()


ranTimer := false


const stealTries = 4
// 偷取操作至多会遍历全局的 p 队列 4 次,过程中只要找到可窃取的 p 则会立即返回.
for i := 0; i < stealTries; i++ {
stealTimersOrRunNextG := i == stealTries-1

// fastrand()注意这里,随机。
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
// ...
}
}


return nil, false, now, pollUntil, ranTime

为保证窃取行为的公平性,遍历的起点是随机的. 步长是一个随机的质数,这里可以通过上面的 stealOrder 类型源码看出:

1
2
3
4
5
6
7
func (ord *randomOrder) start(i uint32) randomEnum {
return randomEnum{
count: ord.count,
pos: i % ord.count,
inc: ord.coprimes[i/ord.count%uint32(len(ord.coprimes))], // coprimes 是质数合集、inc 就是每次新增的步长,可以看出。pos 和 inc 都是通过随机数种子 i 来生成的
}
}

窃取动作的核心逻辑位于 runqgrab 方法当中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// . 窃取动作的核心逻辑 batch就是来偷东西的篮子(当前饥饿 p 的队列容器。我记得上面说过,p 队列里都有个 256 的数组!
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
// 每次对一个 p 尝试窃取前,会对其局部队列加锁;h 和 t 是俩锁,头和尾。记住这俩锁,最后偷完了要解锁的!
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer


// 指针相减,得到 p 队列长度
n := t - h
n = n - n/2
// 这里!窃取目标的一半!
if n == 0 {
if stealRunNextG {
// Try to steal from _p_.runnext.
if next := _p_.runnext; next != 0 {
if _p_.status == _Prunning {

if GOOS != "windows" && GOOS != "openbsd" && GOOS != "netbsd" {
usleep(3)
} else {
osyield()
}
}
if !_p_.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next
return 1
}
}
return 0
}
if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
continue
}
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
// 就偷你了,放到我的篮子里
batch[(batchHead+i)%uint32(len(batch))] = g
}

// 调整被偷玩的 head 指针。这里 h 放进去。应该是解锁了。
if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return n //返回实际偷取的数量.
}
}
}

6 execute

当 g0 为 m 寻找到可执行的 g 之后,接下来就开始执行 g. 这部分内容位于 runtime/proc.go 的 execute 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func execute(gp *g, inheritTime bool) {
_g_ := getg()


_g_.m.curg = gp //当前 g 确定。绑定到 m
gp.m = _g_.m // m 绑定到 gp
casgstatus(gp, _Grunnable, _Grunning) // 变为 running
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
// 更新 p 的总调度次数;刚刚 %61 那用过这个成员,是否记得?
_g_.m.p.ptr().schedtick++
}


gogo(&gp.sched) //gogo,前面提到过

7 gosched_m

位于 runtime/proc.go 文件中:

1
2
3
4
5
// g 执行主动让渡时,会调用 mcall 方法将归还给 g0
func Gosched() {
// ...
mcall(gosched_m) //执行权切换回 g0,并由 g0 调用 gosched_m 方法 gosched_m是啥,请看下面
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// gosched_m 执行的时候,已经是 g0 掌权了
func gosched_m(gp *g) {
goschedImpl(gp)
}


func goschedImpl(gp *g) {
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
// 将当前 g 的状态由执行中切换为待执行 _Grunnable:
casgstatus(gp, _Grunning, _Grunnable)
dropg() // g 与 m 解绑,具体逻辑下面会贴

// 将 g 添加到全局队列当中
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)


schedule() //开启新一轮的调度
1
2
3
4
5
6
7
8
// 解绑的具体逻辑
func dropg() {
_g_ := getg()


setMNoWB(&_g_.m.curg.m, nil) //m 干成 nil
setGNoWB(&_g_.m.curg, nil) //g 干成 nil
}

8 park_m 与 ready

g 需要被动调度时,会调用 mcall 方法切换至 g0,并调用 park_m 方法将 g 置为阻塞态,执行流程位于 runtime/proc.go 的 gopark 方法当中:

1
2
3
4
5
6
// 这里很有意思,我读代码读到这的时候,问了我老婆一句,park 不是公园的意思吗?小红球说:害有停车的意思。对哈。P 停车场不就是它么。这都忘了。
// 不得不说,如果让我起变量名,我这中式英语应该只会用 stop 这种!
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
// ...
mcall(park_m)
}
1
2
3
4
5
6
7
8
9
10
11
func park_m(gp *g) {
_g_ := getg()


//将当前 g 的状态由 running 改为 waiting;
casgstatus(gp, _Grunning, _Gwaiting)
dropg() //将 g 与 m 解绑;


// ...
schedule() //执行新一轮的调度 schedule.

当因被动调度陷入阻塞态的 g 需要被唤醒时,这部分不是g0要关心的。

唤醒会由其他协程执行 goready 方法(位于 runtime/proc.go)将 g 重新置为可执行的状态。

为啥是其他协程呢,因为阻塞后,唤醒的动作应该是由应用层来完成,比如channel 的实现里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 被动调度如果需要唤醒,则会其他 g 负责将 g 的状态由 waiting 改为 runnable,然后会将其添加到唤醒者的 p 的本地队列中。所以就是谁唤醒的它谁就放谁那。
func goready(gp *g, traceskip int) {
systemstack(func() {
// 这里systemstack会把执行权交给 g0, g0 会开始 ready 执行
ready(gp, traceskip, true)
})
}

func ready(gp *g, traceskip int, next bool) {
// ...
_g_ := getg() //拿到需要被唤醒的 g
// ...
//先将 g 的状态从阻塞态改为可执行的状态;
casgstatus(gp, _Gwaiting, _Grunnable)
// )调用 runqput 将当前 g 添加到唤醒者 p 的本地队列中,如果队列满了,会连带 g 一起将一半的元素转移到全局队列.
runqput(_g_.m.p.ptr(), gp, next)
// ...
}

9 goexit0

当 g 执行完成时,会先执行 mcall 方法切换至 g0,然后调用 goexit0 方法,内容为 runtime/proc.go:

1
2
3
4
5
// Finishes execution of the current goroutine.
func goexit1() {
// ...
mcall(goexit0)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func goexit0(gp *g) {
_g_ := getg()
_p_ := _g_.m.p.ptr()


casgstatus(gp, _Grunning, _Gdead)//将 g 状态置为 dead;
// ...
gp.m = nil
// ...


dropg() //解绑


// ...
schedule() //新一轮

10 retake

与 4.7-4.9 小节的区别在于,抢占调度的执行者不是 g0,而是一个全局的 monitor g,代码位于 runtime/proc.go 的 retake 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 每隔一段时间就retake一次
func retake(now int64) uint32 {
n := 0

lock(&allpLock) 加锁后,遍历全局的 p 队列,寻找需要被抢占的目标
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
// ...
if s == _Psyscall {
// ...
倘若某个 p 同时满足下述条件(系统得找一个繁忙的时候,还不能老抢),则会进行抢占调度:
I 执行系统调用超过 10 ms;
II p 本地队列有等待执行的 g;
III 或者当前没有空闲的 p 和 m.
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
unlock(&allpLock)
// 抢占调度的步骤是,先将当前 p 的状态更新为 idle,然后步入 handoffp 方法中,判断是否需要为 p 寻找接管的 m(因为其原本绑定的 m 正在执行系统调用):
if atomic.Cas(&_p_.status, s, _Pidle) {
n++
_p_.syscalltick++
handoffp(_p_)
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
当以下四个条件满足其一时,则需要为 p 获取新的 m:

func handoffp(_p_ *p) {
if !runqempty(_p_) || sched.runqsize != 0 {
I 当前 p 本地队列还有待执行的 g、全局不是空
startm(_p_, false)
return
}


if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) {
II 全局繁忙(没有空闲的 p 和 m,全局 g 队列为空)
startm(_p_, true)
return
}

// 这一锁,你就知道是要调全局队列了!
lock(&sched.lock)
// ...
if sched.runqsize != 0 {
unlock(&sched.lock)
// 全局不是空
startm(_p_, false)
return
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
unlock(&sched.lock)
III 需要处理网络 socket 读写请求
startm(_p_, false)
return
}


// ...

(5)获取 m 时,会先尝试获取已有的空闲的 m,若不存在,则会创建一个新的 m.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func startm(_p_ *p, spinning bool) {

mp := acquirem()
lock(&sched.lock)
// ...

nmp := mget()
if nmp == nil {
id := mReserveID()
unlock(&sched.lock)


var fn func()
// ...
newm(fn, _p_, id)
// ...
return
}
unlock(&sched.lock)
// ...
}

11 reentersyscall 和 exitsyscall

本小节同样与 g 的系统调用有关,但是视角切换回发生系统调用前,与 g 绑定的原 m 当中.

在 m 需要执行系统调用前,会先执行位于 runtime/proc.go 的 reentersyscall 的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func reentersyscall(pc, sp uintptr) {
// 此时执行权同样位于 m 的 g0 手中;
_g_ := getg()


// ...
// 保存当前 g 的执行环境;
save(pc, sp)
_g_.syscallsp = sp
_g_.syscallpc = pc
// 将 g 状态更新为 syscall;
casgstatus(_g_, _Grunning, _Gsyscall)
// ...


// 解除 p 和 当前 m 之间的绑定,因为 m 即将进入系统调用而导致短暂不可用;
pp := _g_.m.p.ptr()
pp.m = 0

// 将 p 添加到 当前 m 的 oldP 容器当中,后续 m 恢复后,会优先寻找旧的 p 重新建立绑定关系.这里是为了保证亲缘性。在之前介绍 p 的 next 指针式也有说过
_g_.m.oldp.set(pp)
_g_.m.p = 0
// 将 p 的状态更新为 syscall;
atomic.Store(&pp.status, _Psyscall)
// ...

当 m 完成了内核态的系统调用之后,此时会步入位于 runtime/proc.goexitsyscall 函数中,尝试寻找 p 重新开始运作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func exitsyscall() {
// 方法执行之初,此时的执行权是普通 g.
_g_ := getg()

// ...
// 倘若此前设置的 oldp 仍然可用,则重新和 oldP 绑定
if exitsyscallfast(oldp) {
// ...
// 将当前 g 重新置为 running 状态,然后开始执行后续的用户函数;
casgstatus(_g_, _Gsyscall, _Grunning)
// ...
return
}


// ...
// old 绑定失败,则调用 mcall 方法切换到 m 的 g0,并执行 exitsyscall0 方法, exitsyscall0 做什么下面会贴
mcall(exitsyscall0)
// ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func exitsyscall0(gp *g) {
// 将 g 由系统调用状态切换为可运行态,
casgstatus(gp, _Gsyscall, _Grunnable)
dropg()// 并解绑 g 和 m 的关系:

// 从全局 p 队列获取可用的 p, 注意,是获取 p,因为我们现在就是要给 g 找一个 p
lock(&sched.lock)
var _p_ *p
if schedEnabled(gp) {
_p_, _ = pidleget(0)
}

var locked bool
if _p_ == nil {
// 如若无 p 可用,则将 g 添加到全局队列,当前 m 陷入沉睡. 直到被唤醒后才会继续发起调度.
globrunqput(gp)
}

unlock(&sched.lock)
// 解锁,我们就知道,从全局 p 队列获取的方法完事了

// 如果获取到了,则执行 g
if _p_ != nil {
acquirep(_p_)
execute(gp, false) // Never returns.
}

// ...

stopm()
schedule() // Never returns.
}