go 的设计哲学:基于通信实现共享内存,而不是基于共享内存实现通讯!

基于共享内存实现通信,其实可能让通信的双方过多地入侵到了共享区域。

而 go 的设计思想就是,想通信,就基于我提供的中间模块。

这,就是 channel !

1 hchan 核心数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type hchan struct {
qcount uint // total data in the queue 当前 channel 中存在多少个元素;
dataqsiz uint // size of the circular queue 当前 channel 能存放的元素容量;
buf unsafe.Pointer // points to an array of dataqsiz elements channel 中用于存放元素的环形缓冲区;
elemsize uint16 // channel 元素类型的大小;
closed uint32 // 标识 channel 是否关闭;
elemtype *_type // element type \ channel 元素类型;
sendx uint // send index 发送元素进入环形缓冲区的 index;
recvx uint // receive index 接收元素所处的环形缓冲区的 index;
recvq waitq // list of recv waiters 因接收而陷入阻塞的协程队列;waitq是啥下面会贴
sendq waitq // list of send waiters 因发送而陷入阻塞的协程队列;

lock mutex // 并发安全必须有的东西
}

type waitq struct {//阻塞的协程队列
first *sudog //sudog 是啥,下面会贴
last *sudog
}

type sudog struct {
g *g //goroutine,协程;

next *sudog 队列中的下一个节点;
prev *sudog 队列中的前一个节点
elem unsafe.Pointer // data element (may point to stack) 读取/写入 channel 的数据的容器;

isSelect bool 标识当前协程是否处在 select 多路复用的流程中;

c *hchan 标识与当前 sudog 交互的 chan.
}

2 构造器函数

通过 switch 分支可以看出有3种类型的 channel:

  • 0 空间的(无缓冲 或 struct{})
  • 指针类型的
  • 非指针类型的(struct 类型)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func makechan(t *chantype, size int) *hchan {
elem := t.elem

// ...
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
// mem 大小为 element 类型大小与 element 个数相乘后得到
// 仅当无缓冲型 channel 时,因size 为 0 导致 mem= 0;
// 当 elem 是 struct{} 类型,其 elem.size=0 也导致 mem=0
if overflow || mem > maxAlloc-hchanSize || size < 0 {
// 判断申请内存空间大小是否越界
panic(plainError("makechan: size out of range"))
}

var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
// hchanSize 是源码中写死的一个常量。hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers. 不是指针类型。(是 struct 类型)
// Allocate hchan and buf in one call.
//有缓冲的 struct 型,则一次性分配好 hchanSize + mem 大小的空间,并且调整 chan 的 buf 指向 mem 的起始位置;
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
// 有缓冲的 pointer 型,则分别申请 chan 和 buf 的空间,两者无需连续;
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)

lockInit(&c.lock, lockRankHchan)

return
}

而指针类型和非指针类型的区别就在于,指针类型可以内存不连续(毕竟存的也是指针),buf 是单独分配的。

而非指针类型,是同时分配了 hchanSize+mem。

炫杉:而至于为什么要区分后面两种,我问了 ChatGPT,得到以下答案:

ChatGPT:这种区分是因为元素是否包含指针数据会影响到 channel 在内存分配和垃圾回收方面的处理方式。

当 elem.ptrdata 为 0 时,表示元素不包含指针数据,因此在内存分配和垃圾回收方面可以更加简单高效地处理。这种情况下,channel 的元素类型是非指针类型,不需要特殊的内存管理和垃圾回收操作。

当 elem.ptrdata 不为 0 时,表示元素包含指针数据,需要特殊的内存管理和垃圾回收操作来确保指针数据的正确性和内存安全性。这种情况下,channel 的元素类型是指针类型或包含指针的类型,需要额外的内存分配和垃圾回收处理。

3 写流程

3.1 异常处理 ch ==nil / ch is closed

炫杉:异常处理逻辑总是放在函数的最开始,满足就立马退出。这个思想在我们写任何代码时都应该学会!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
//chansend 方法下面会贴。这里有个伏笔,就是第三个参数传入了 true。于是一定还有一个传入 false 的版本。那就是 selectnbsend
}

// 看名字可以猜到,selectnbsend 是在 select 时的无阻塞channel。第三个参数决定了它不会阻塞。
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
// 对于未初始化的 chan,写入操作会引发死锁;
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

lock(&c.lock)

if c.closed != 0 {
// 对于已关闭的 chan,写入操作会引发 panic.
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

// ...

3.2 case1:存在阻塞读协程

有人等着要呢,直接唤醒并喂到嘴里(越过缓冲区)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...

lock(&c.lock)

// ...

if sg := c.recvq.dequeue(); sg != nil { // 从阻塞度协程队列中取出一个 goroutine 的封装对象 sudog;
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).直接给接受者,跳过缓冲区
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
// - 在 send 方法中,会基于 memmove 方法,直接将元素拷贝交给 sudog 对应的 goroutine;
// - 在 send 方法中会完成解锁动作.
return true
}

// ...

3.3 case2:无阻塞读协程 but环形缓冲区仍有空间

最简单,直接写入环形缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 将当前元素添加到环形缓冲区 sendx 对应的位置;
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)

c.sendx++
if c.sendx == c.dataqsiz {
// 环形缓冲。sendx等于 size 了就到头了
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

// ...
}

3.4 case3:无阻塞读协程&&环形缓冲区无空间

比较复杂,没人要也没地儿放,阻塞写协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)

// ...
gp := getg() //获取当前 g

// 构造封装当前 goroutine 的 sudog 对象;
mysg := acquireSudog()
// 完成指针指向,建立 sudog、goroutine、channel 之间的指向关系;
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
// 构造完成

c.sendq.enqueue(mysg) //把 sudog 添加到当前 channel 的阻塞写协程队列中;

atomic.Store8(&gp.parkingOnChan, 1)
// park 当前协程;
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

// 能走到这说明当前协程从 park 中被唤醒
// 清理、回收 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走)
gp.waiting = nil
closed := !mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)

is closed {
if c.closed == 0 { //这里为啥再判断一次,且看我下面问 chatGPT
throw("chansed: spurious wakeup")
}
// 当前chan已close
panic(plainError("send on closed channel"))
}
return true
}

is closedsudog 这结构上一个专门用于 chan 和 g 通信用的成员 success,在 chan 因为 close 而唤醒 g 时,他会变成 false。但是为啥要再判断一次 c.closed == 0 呢?难道走进来的不必然是 closed==0 吗?问 chatGPT 的回答:

在通道已关闭的情况下,会进一步检查通道的关闭状态,确保关闭状态已经被正确地设置。如果 c.closed 的值为 0,则表示发生了异常情况,因为通道已经关闭但关闭状态却没有被正确设置。

3.5 写流程整体串联

  • 判断是不是 nil
  • 判断是不是已关闭
  • 判断能不能写
    • 有人阻塞读,直接给
    • 没人阻塞读,写到缓冲
    • 缓冲写不进。关联阻塞写队列与当前 goroutine。gopark
      • gopark 被唤醒,释放。
      • 如果因为close被唤醒

4 读流程

4.1 异常处理 ch==nil / ch is closed and empty

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
// park 挂起,引起死锁;
throw("unreachable")
}


lock(&c.lock)

if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
// 赋0值
typedmemclr(c.elemtype, ep)
}
// 关闭且无元素、直接解锁返回即可
return true, false
}
// The channel has been closed, but the channel's buffer have data.
// 如果关闭了。但是缓冲区有元素。这个 if 会直接结束。往下走到 case 2 的逻辑里
}
}

4.2 case1:有阻塞的写协程

倘若 channel 无缓冲区,则直接读取写协程元素,并唤醒写协程;

倘若 channel 有缓冲区,则读取缓冲区头部元素(保证 FIFO),并将写协程元素写入缓冲区尾部后唤醒写协程;

1
2
3
4
5
6
7
8
9
10
11
12
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

lock(&c.lock)

if sg := c.sendq.dequeue(); sg != nil {
// 从阻塞写协程队列中获取到一个写协程;
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
//解锁,返回.
return true, true
}
// ...
}

4.3 case2:无阻塞写协程&&缓冲区有元素

简单,直接从环形缓冲区读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
lock(&c.lock)
// ...
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
// 获取到 recvx 对应位置的元素;
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// ...

4.4 case3:无阻塞写协程&&缓冲区无元素

这次尴尬了,阻塞读协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
lock(&c.lock)
// ...
gp := getg()
mysg := acquireSudog() //- 构造封装当前 goroutine 的 sudog 对象;

// - 完成指针指向,建立 sudog、goroutine、channel 之间的指向关系;
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
gp.param = nil

c.recvq.enqueue(mysg) //把 sudog 添加到当前 channel 的阻塞读协程队列中;
atomic.Store8(&gp.parkingOnChan, 1)
// park 当前协程;
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

// 倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被写入);
gp.waiting = nil
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}

4.5 读流程整体

  • is nil?
    • throw
  • lock
  • is closed?
    • buff 区没有数据?
      • unLock,return
    • goto *
  • has send gorouinte?
    • buff区有数据?
      • 读出
      • 写入buff
      • 唤醒 send gorouinte
    • buff 区有数据?
      • 读出,解锁,返回
  • add goroutine to waitQ. (gopark)

5 阻塞与非阻塞模式

在上述源码分析流程中,均是以阻塞模式为主线进行讲述,忽略非阻塞模式的有关处理逻辑. 此处阐明两个问题:

  • 非阻塞模式下,流程逻辑有何区别?
  • 何时会进入非阻塞模式?

5.1 非阻塞模式逻辑区别

非阻塞模式下,读/写 channel 方法通过一个 bool 型的响应参数,用以标识是否读取/写入成功.

  • 所有需要使得当前 goroutine 被挂起的操作,在非阻塞模式下都会返回 false;
  • 所有是的当前 goroutine 会进入死锁的操作,在非阻塞模式下都会返回 false;
  • 所有能立即完成读取/写入操作的条件下,非阻塞模式下会返回 true.

5.2 何时进入非阻塞模式

默认情况下,读/写 channel 都是阻塞模式,只有在 select 语句组成的多路复用分支中,与 channel 的交互会变成非阻塞模式:

1
2
3
4
5
ch := make(chan int)
select{
case <- ch:
default:
}

5.3 代码一览

1
2
3
4
5
6
7
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}

在 select 语句包裹的多路复用分支中,读和写 channel 操作会被汇编为 selectnbrecv 和 selectnbsend 方法,底层同样复用 chanrecv 和 chansend 方法,但此时由于第三个入参 block 被设置为 false,导致后续会走进非阻塞的处理分支.

6 两种读 channel 的协议

读取 channel 时,可以根据第二个 bool 型的返回值用以判断当前 channel 是否已处于关闭状态:

1
2
3
ch := make(chan int, 2)
got1 := <- ch
got2,ok := <- ch

但这是怎么做到的呢?go 不是只允许固定的返回值个数吗?

实现上述功能的原因是,两种格式下,读 channel 操作会被汇编成不同的方法:

1
2
3
4
5
6
7
8
9
10
11
12
// entry points for <- c from compiled code.
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}

7 关闭

关闭 channel 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func closechan(c *hchan) {
if c == nil {
// 关闭未初始化过的 channel 会 panic;
panic(plainError("close of nil channel"))
}

lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
// 重复关闭 channel 会 panic;
panic(plainError("close of closed channel"))
}

c.closed = 1

var glist gList
//将阻塞读协程队列中的协程节点统一添加到 glist;
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}

// 将阻塞写协程队列中的协程节点统一添加到 glist;(他们会 panic)
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
unlock(&c.lock)

// 唤醒 glist 当中的所有协程.
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3) // 至于 gp 是怎么 panic 的,可以看上面介绍的写时 gopark 阻塞,又被唤醒的逻辑。